You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by michaelandrepearce <gi...@git.apache.org> on 2019/01/07 10:36:51 UTC

[GitHub] activemq-artemis pull request #2490: V2 196

GitHub user michaelandrepearce opened a pull request:

    https://github.com/apache/activemq-artemis/pull/2490

    V2 196

    @franz1981 an alternative so we don't have to have a copy of CopyOnWriteArrayList, it does mean on add or remove consumer we have to invoke toArray which causes a copy, but this is not on hot path, so i think we should be good, and avoids us having to clone a jvm class.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/michaelandrepearce/activemq-artemis V2-196

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/activemq-artemis/pull/2490.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2490
    
----
commit d731ffe7288cb857fef1b97deff4b7dc18aeb6d7
Author: Michael André Pearce <mi...@...>
Date:   2018-12-31T13:22:02Z

    ARTEMIS-196 Implement Consumer Priority
    
    Add consumer priority support
    Includes refactor of consumer iterating in QueueImpl to its own logical class, to be able to implement.
    Add OpenWire JMS Test - taken from ActiveMQ5
    Add Core JMS Test
    Add AMQP Test
    Add Docs

commit b0c775840fc98b5d3f5f3485802de3270c614d9a
Author: Michael André Pearce <mi...@...>
Date:   2019-01-05T09:48:24Z

    Extract

----


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246555304
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +import org.apache.activemq.artemis.utils.collections.PriorityCollection;
    +import org.apache.activemq.artemis.utils.collections.ResettableIterator;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Set;
    +import java.util.Spliterator;
    +import java.util.concurrent.CopyOnWriteArraySet;
    +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    +import java.util.function.Consumer;
    +
    +/**
    + * This class's purpose is to hold the consumers.
    + *
    + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe,
    + * but also lock less for a read path, which is our HOT path.
    + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers.
    + *
    + * There can only be one resettable iterable view,
    + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT
    + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset,
    + *
    + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator().
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsumers<T> {
    +
    +   private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
    +   private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
    +   private final AtomicReferenceFieldUpdater<QueueConsumersImpl, ResettableIterator> changedIteratorFieldUpdater =  AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator");
    +   private volatile ResettableIterator<T> changedIterator;
    +   private ResettableIterator<T> currentIterator = consumers.resettableIterator();
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      return consumers.getPriorites();
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return currentIterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return currentIterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      if (changedIterator != null) {
    +         currentIterator = changedIterator;
    +         changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null);
    +      } else {
    +         currentIterator.reset();
    +      }
    +      return this;
    +   }
    +
    +   @Override
    +   public boolean add(T t) {
    +      boolean result = consumers.add(t);
    +      if (result) {
    +         changedIteratorFieldUpdater.set(this, consumers.resettableIterator());
    --- End diff --
    
    see other comment


---

[GitHub] activemq-artemis issue #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2490
  
    @franz1981 pushed following changes based on your comments that agree with, for others i have left comment for us to discuss. 
    
    Changes:
    1) Ensured total size of the priority collection can never exceed Integer.MAX_VALUE, by ensuring this on add, thus the edge case you were worried about of the calcSize being greater than int cannot occur. Note if someone has that many consumers, we probably want to have some discussions with them as they would be some power user!!! ;)
    
    2)  Avoid double volatile read of changedIterator in reset method.
    
    3) Removed need for a cast in MultiResettableIterator


---

[GitHub] activemq-artemis issue #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2490
  
    @gemmellr pushed changes based on latest comments, thanks.


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246322551
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +import java.util.Iterator;
    +
    +/**
    + * Provides an Iterator that works over multiple underlying iterators.
    + *
    + * @param <T> type of the class of the iterator.
    + */
    +public class MultiIterator<T> implements Iterator<T> {
    +
    +   private final Iterator<T>[] iterators;
    --- End diff --
    
    `I[]`: comments above


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246516479
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +import org.apache.activemq.artemis.utils.collections.PriorityCollection;
    +import org.apache.activemq.artemis.utils.collections.ResettableIterator;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Set;
    +import java.util.Spliterator;
    +import java.util.concurrent.CopyOnWriteArraySet;
    +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    +import java.util.function.Consumer;
    +
    +/**
    + * This class's purpose is to hold the consumers.
    + *
    + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe,
    + * but also lock less for a read path, which is our HOT path.
    + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers.
    + *
    + * There can only be one resettable iterable view,
    + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT
    + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset,
    + *
    + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator().
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsumers<T> {
    +
    +   private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
    +   private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
    +   private final AtomicReferenceFieldUpdater<QueueConsumersImpl, ResettableIterator> changedIteratorFieldUpdater =  AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator");
    +   private volatile ResettableIterator<T> changedIterator;
    +   private ResettableIterator<T> currentIterator = consumers.resettableIterator();
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      return consumers.getPriorites();
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return currentIterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return currentIterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      if (changedIterator != null) {
    +         currentIterator = changedIterator;
    +         changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null);
    +      } else {
    +         currentIterator.reset();
    +      }
    +      return this;
    +   }
    +
    +   @Override
    +   public boolean add(T t) {
    +      boolean result = consumers.add(t);
    +      if (result) {
    +         changedIteratorFieldUpdater.set(this, consumers.resettableIterator());
    +      }
    +      return result;
    +   }
    +
    +   @Override
    +   public boolean remove(T t) {
    +      boolean result = consumers.remove(t);
    +      if (result) {
    +         changedIteratorFieldUpdater.set(this, consumers.resettableIterator());
    --- End diff --
    
    Agreed. But this is not hot path, and if anything would allow in future for consumers to be concurrently added or removed without the existing bit sync blocks there is today in queueimpl


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246516870
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +import org.apache.activemq.artemis.utils.collections.PriorityCollection;
    +import org.apache.activemq.artemis.utils.collections.ResettableIterator;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Set;
    +import java.util.Spliterator;
    +import java.util.concurrent.CopyOnWriteArraySet;
    +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    +import java.util.function.Consumer;
    +
    +/**
    + * This class's purpose is to hold the consumers.
    + *
    + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe,
    + * but also lock less for a read path, which is our HOT path.
    + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers.
    + *
    + * There can only be one resettable iterable view,
    + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT
    + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset,
    + *
    + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator().
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsumers<T> {
    +
    +   private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
    +   private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
    +   private final AtomicReferenceFieldUpdater<QueueConsumersImpl, ResettableIterator> changedIteratorFieldUpdater =  AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator");
    +   private volatile ResettableIterator<T> changedIterator;
    +   private ResettableIterator<T> currentIterator = consumers.resettableIterator();
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      return consumers.getPriorites();
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return currentIterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return currentIterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      if (changedIterator != null) {
    +         currentIterator = changedIterator;
    +         changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null);
    +      } else {
    +         currentIterator.reset();
    --- End diff --
    
    Like any iterator and iterator should only be interacted by one thread at a time.


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r245974624
  
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.tests.integration.amqp;
    +
    +import org.apache.activemq.transport.amqp.client.AmqpClient;
    +import org.apache.activemq.transport.amqp.client.AmqpConnection;
    +import org.apache.activemq.transport.amqp.client.AmqpMessage;
    +import org.apache.activemq.transport.amqp.client.AmqpReceiver;
    +import org.apache.activemq.transport.amqp.client.AmqpSession;
    +import org.apache.qpid.proton.amqp.Symbol;
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Test various behaviors of AMQP receivers with the broker.
    + */
    +public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
    +
    +   @Test(timeout = 30000)
    +   public void testPriority() throws Exception {
    +
    +      AmqpClient client = createAmqpClient();
    +      AmqpConnection connection = addConnection(client.connect());
    +      AmqpSession session = connection.createSession();
    +
    +      Map<Symbol, Object> properties1 = new HashMap<>();
    +      properties1.put(Symbol.getSymbol("priority"), 50);
    --- End diff --
    
    I'd suggest creating consumers with priorities out of order (e.g highest in middle), so they arent simply registered in sequence, as otherwise a simple failure to round-robin delivery attempts (given every receiver has enough credit to receive all messages) might also lead to the expected result even without any priority handling consideration.


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246135542
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java ---
    @@ -52,6 +57,7 @@ public String toString() {
           StringBuffer buff = new StringBuffer(getParentString());
           buff.append(", queueName=" + queueName);
           buff.append(", filterString=" + filterString);
    +      buff.append(", priority=" + priority);
    --- End diff --
    
    done


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246521520
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +import java.util.Iterator;
    +
    +/**
    + * Provides an Iterator that works over multiple underlying iterators.
    + *
    + * @param <T> type of the class of the iterator.
    + */
    +public class MultiIterator<T> implements Iterator<T> {
    --- End diff --
    
    Yeah i had a version like that but it makes using simple MultiIterator ugly to use. Uneededly. And to extend if needed only needs a cast. In case of resetable.


---

[GitHub] activemq-artemis issue #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2490
  
    @franz1981 did you get a chance to look, do you think this is better than original solution? 
    
    Am keen to get this feature into the next release cut.


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246140041
  
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.tests.integration.amqp;
    +
    +import org.apache.activemq.transport.amqp.client.AmqpClient;
    +import org.apache.activemq.transport.amqp.client.AmqpConnection;
    +import org.apache.activemq.transport.amqp.client.AmqpMessage;
    +import org.apache.activemq.transport.amqp.client.AmqpReceiver;
    +import org.apache.activemq.transport.amqp.client.AmqpSession;
    +import org.apache.qpid.proton.amqp.Symbol;
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Test various behaviors of AMQP receivers with the broker.
    + */
    +public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
    +
    +   @Test(timeout = 30000)
    +   public void testPriority() throws Exception {
    +
    +      AmqpClient client = createAmqpClient();
    +      AmqpConnection connection = addConnection(client.connect());
    +      AmqpSession session = connection.createSession();
    +
    +      Map<Symbol, Object> properties1 = new HashMap<>();
    +      properties1.put(Symbol.getSymbol("priority"), 50);
    +      AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, false, properties1);
    +      receiver1.flow(100);
    +
    +      Map<Symbol, Object> properties2 = new HashMap<>();
    +      properties2.put(Symbol.getSymbol("priority"), 10);
    +      AmqpReceiver receiver2 = session.createReceiver(getQueueName(), null, false, false, properties2);
    +      receiver2.flow(100);
    +
    +      Map<Symbol, Object> properties3 = new HashMap<>();
    +      properties3.put(Symbol.getSymbol("priority"), 5);
    +      AmqpReceiver receiver3 = session.createReceiver(getQueueName(), null, false, false, properties3);
    +      receiver3.flow(100);
    +
    +      sendMessages(getQueueName(), 5);
    +
    +
    +      for (int i = 0; i < 5; i++) {
    +         AmqpMessage message1 = receiver1.receive(250, TimeUnit.MILLISECONDS);
    +         AmqpMessage message2 = receiver2.receive(250, TimeUnit.MILLISECONDS);
    --- End diff --
    
    changed to receiveNoWait


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246376356
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +import org.apache.activemq.artemis.utils.collections.PriorityCollection;
    +import org.apache.activemq.artemis.utils.collections.ResettableIterator;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Set;
    +import java.util.Spliterator;
    +import java.util.concurrent.CopyOnWriteArraySet;
    +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    +import java.util.function.Consumer;
    +
    +/**
    + * This class's purpose is to hold the consumers.
    + *
    + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe,
    + * but also lock less for a read path, which is our HOT path.
    + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers.
    + *
    + * There can only be one resettable iterable view,
    + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT
    + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset,
    + *
    + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator().
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsumers<T> {
    +
    +   private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
    +   private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
    +   private final AtomicReferenceFieldUpdater<QueueConsumersImpl, ResettableIterator> changedIteratorFieldUpdater =  AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator");
    +   private volatile ResettableIterator<T> changedIterator;
    +   private ResettableIterator<T> currentIterator = consumers.resettableIterator();
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      return consumers.getPriorites();
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return currentIterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return currentIterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      if (changedIterator != null) {
    +         currentIterator = changedIterator;
    +         changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null);
    +      } else {
    +         currentIterator.reset();
    +      }
    +      return this;
    +   }
    +
    +   @Override
    +   public boolean add(T t) {
    +      boolean result = consumers.add(t);
    +      if (result) {
    +         changedIteratorFieldUpdater.set(this, consumers.resettableIterator());
    --- End diff --
    
    `lazeSet` is enough for single-writer/single-threaded semantic 


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246128551
  
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.tests.integration.amqp;
    +
    +import org.apache.activemq.transport.amqp.client.AmqpClient;
    +import org.apache.activemq.transport.amqp.client.AmqpConnection;
    +import org.apache.activemq.transport.amqp.client.AmqpMessage;
    +import org.apache.activemq.transport.amqp.client.AmqpReceiver;
    +import org.apache.activemq.transport.amqp.client.AmqpSession;
    +import org.apache.qpid.proton.amqp.Symbol;
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Test various behaviors of AMQP receivers with the broker.
    + */
    +public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
    +
    +   @Test(timeout = 30000)
    +   public void testPriority() throws Exception {
    +
    +      AmqpClient client = createAmqpClient();
    +      AmqpConnection connection = addConnection(client.connect());
    +      AmqpSession session = connection.createSession();
    +
    +      Map<Symbol, Object> properties1 = new HashMap<>();
    +      properties1.put(Symbol.getSymbol("priority"), 50);
    --- End diff --
    
    This is actually tested on the queueconsumerimpl test. But agree we can do same here


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246376178
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +import org.apache.activemq.artemis.utils.collections.PriorityCollection;
    +import org.apache.activemq.artemis.utils.collections.ResettableIterator;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Set;
    +import java.util.Spliterator;
    +import java.util.concurrent.CopyOnWriteArraySet;
    +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    +import java.util.function.Consumer;
    +
    +/**
    + * This class's purpose is to hold the consumers.
    + *
    + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe,
    + * but also lock less for a read path, which is our HOT path.
    + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers.
    + *
    + * There can only be one resettable iterable view,
    + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT
    + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset,
    + *
    + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator().
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsumers<T> {
    +
    +   private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
    +   private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
    +   private final AtomicReferenceFieldUpdater<QueueConsumersImpl, ResettableIterator> changedIteratorFieldUpdater =  AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator");
    +   private volatile ResettableIterator<T> changedIterator;
    +   private ResettableIterator<T> currentIterator = consumers.resettableIterator();
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      return consumers.getPriorites();
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return currentIterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return currentIterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      if (changedIterator != null) {
    +         currentIterator = changedIterator;
    +         changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null);
    +      } else {
    +         currentIterator.reset();
    --- End diff --
    
    I suppose that `reset` is safe to be called just by one thread at time, if not, it would be complex because `currentIterator` could be changed indipendently


---

[GitHub] activemq-artemis issue #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2490
  
    @franz1981 good point around adding further tests to the extracted out bits. Agree it will make everything more robust


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520138
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java ---
    @@ -0,0 +1,319 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.function.Consumer;
    +import java.util.function.Supplier;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the the different collections used for each priority level.
    + *
    + * A supplier is required to provide the underlying collection needed when a new priority level is seen,
    + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics,
    + * if list, then list semantics.
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware.
    + */
    +public class PriorityCollection<T extends PriorityAware> extends AbstractCollection<T> {
    +
    +   private final Supplier<Collection<T>> supplier;
    +   private volatile PriorityHolder<T>[] priorityHolders = newPrioritySetArrayInstance(0);
    +   private volatile int size;
    +
    +   private void setArray(PriorityHolder<T>[] priorityHolders) {
    +      this.priorityHolders = priorityHolders;
    +   }
    +
    +   private PriorityHolder<T>[] getArray() {
    +      return priorityHolders;
    +   }
    +
    +
    +   public PriorityCollection(Supplier<Collection<T>> supplier) {
    +      this.supplier = supplier;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> PriorityHolder<T>[] newPrioritySetArrayInstance(int length) {
    +      return (PriorityHolder<T>[]) Array.newInstance(PriorityHolder.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   public Set<Integer> getPriorites() {
    +      PriorityHolder<T>[] snapshot = getArray();
    +      return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      Iterator<T>[] iterators = getIterators();
    +      return new MultiIterator<>(iterators);
    +   }
    +
    +   private Iterator<T>[] getIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      Iterator<T>[] iterators = newIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = snapshot[i].getValues().iterator();
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Iterator<T>[] newIteratorArrayInstance(int length) {
    +      return (Iterator<T>[]) Array.newInstance(Iterator.class, length);
    +   }
    +
    +   public ResettableIterator<T> resettableIterator() {
    +      return new MultiResettableIterator<T>(getResettableIterators());
    +   }
    +
    +   private ResettableIterator<T>[] getResettableIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      ResettableIterator<T>[] iterators = newResettableIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = new ArrayResettableIterator<>(snapshot[i].getValues().toArray());
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> ResettableIterator<T>[] newResettableIteratorArrayInstance(int length) {
    +      return (ResettableIterator<T>[]) Array.newInstance(ResettableIterator.class, length);
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      PriorityHolder<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].getValues().forEach(action);
    +      }
    +   }
    +
    +   private Collection<T> getCollection(int priority, boolean createIfMissing) {
    +      PriorityHolder<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         PriorityHolder<T> midVal = current[mid];
    +
    +         if (midVal.getPriority() > priority)
    +            low = mid + 1;
    +         else if (midVal.getPriority() < priority)
    +            high = mid - 1;
    +         else
    +            return midVal.getValues(); //key found
    +      }
    +
    +      if (createIfMissing) {
    +         PriorityHolder<T>[] newArray = newPrioritySetArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newArray, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newArray, low + 1, current.length - low);
    +         }
    +         newArray[low] = new PriorityHolder<T>(priority, supplier);
    +         setArray(newArray);
    +         return newArray[low].getValues();
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    +      boolean result = addInternal(t);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean addInternal(T t) {
    +      if (t == null) return false;
    +      Collection<T> priority = getCollection(t.getPriority(), true);
    +      return priority.add(t);
    +   }
    +
    +   @Override
    +   public boolean remove(Object o) {
    +      return o instanceof PriorityAware && remove((PriorityAware) o);
    +   }
    +
    +   public synchronized boolean remove(PriorityAware priorityAware) {
    +      boolean result = removeInternal(priorityAware);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean removeInternal(PriorityAware priorityAware) {
    +      if ( priorityAware == null) return false;
    +      Collection<T> priority = getCollection(priorityAware.getPriority(), false);
    +      boolean result = priority != null && priority.remove(priorityAware);
    +      if (priority != null && priority.size() == 0) {
    +         removeCollection(priorityAware.getPriority());
    +      }
    +      return result;
    +   }
    +
    +   private Collection<T> removeCollection(int priority) {
    +      PriorityHolder<T>[] current = getArray();
    +      int len = current.length;
    +      int low = 0;
    +      int high = len - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         PriorityHolder<T> midVal = current[mid];
    +
    +         if (midVal.getPriority() > priority)
    +            low = mid + 1;
    +         else if (midVal.getPriority() < priority)
    +            high = mid - 1;
    +         else {
    +            PriorityHolder<T>[] newArray = newPrioritySetArrayInstance(len - 1);
    +            System.arraycopy(current, 0, newArray, 0, mid);
    +            System.arraycopy(current, mid + 1, newArray, mid, len - mid - 1);
    +            setArray(newArray);
    +            return midVal.getValues(); //key found
    +         }
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public boolean containsAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      for (Object e : c)
    +         if (!contains(e))
    +            return false;
    +      return true;
    +   }
    +
    +   @Override
    +   public synchronized boolean addAll(Collection<? extends T> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      for (T e : c)
    +         if (addInternal(e))
    +            modified = true;
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized boolean removeAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      for (Object o : c) {
    +         if (remove(o)) {
    +            modified = true;
    +         }
    +      }
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized boolean retainAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      PriorityHolder<T>[] snapshot = getArray();
    +      for (PriorityHolder<T> priorityHolder : snapshot) {
    +         if (priorityHolder.getValues().retainAll(c)) {
    +            modified = true;
    +         }
    +      }
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized void clear() {
    +      PriorityHolder<T>[] snapshot = getArray();
    +      for (PriorityHolder<T> priorityHolder : snapshot) {
    +         priorityHolder.getValues().clear();
    +      }
    +      calcSize();
    +   }
    +
    +   @Override
    +   public boolean contains(Object o) {
    +      return o instanceof PriorityAware && contains((PriorityAware) o);
    +   }
    +
    +   public boolean contains(PriorityAware priorityAware) {
    +      if (priorityAware == null) return false;
    +      Collection<T> prioritySet = getCollection(priorityAware.getPriority(), false);
    +      return prioritySet != null && prioritySet.contains(priorityAware);
    +   }
    +
    +   private void calcSize() {
    +      PriorityHolder<T>[] current = getArray();
    +      int size = 0;
    +      for (PriorityHolder<T> priorityHolder : current) {
    +         size += priorityHolder.getValues().size();
    --- End diff --
    
    If we have a case of the number of active consumers on a single queue is integer max value we will have many areas to address tbh! And that is some super extreme case.
    
    We can reject on adding a new consumer if adding the new consumer would breach int max value, if we are really worried


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246323602
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +import java.util.Iterator;
    +
    +/**
    + * Provides an Iterator that works over multiple underlying iterators.
    + *
    + * @param <T> type of the class of the iterator.
    + */
    +public class MultiIterator<T> implements Iterator<T> {
    +
    +   private final Iterator<T>[] iterators;
    +   int index = -1;
    +
    +   public MultiIterator(Iterator<T>[] iterators) {
    +      this.iterators = iterators;
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      while (true) {
    +         if (index != -1) {
    +            Iterator<T> currentIterator = get(index);
    +            if (currentIterator.hasNext()) {
    +               return true;
    +            }
    +         }
    +         int next = index + 1;
    +         if (next < iterators.length) {
    +            moveTo(next);
    +         } else {
    +            return false;
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public T next() {
    +      while (true) {
    +         if (index != -1) {
    +            Iterator<T> currentIterator = get(index);
    +            if (currentIterator.hasNext()) {
    +               return currentIterator.next();
    +            }
    +         }
    +         int next = index + 1;
    +         if (next < iterators.length) {
    +            moveTo(next);
    +         } else {
    +            return null;
    +         }
    +      }
    +   }
    +
    +   protected void moveTo(int index) {
    +      this.index = index;
    +   }
    +
    +   protected Iterator<T> get(int index) {
    --- End diff --
    
    ```java
    protected I get(int index) 
    ```


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246518669
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java ---
    @@ -84,6 +94,10 @@ public void setFilterString(SimpleString filterString) {
           this.filterString = filterString;
        }
     
    +   public void setPriority(byte priority) {
    --- End diff --
    
    Keeping inline with the rest of the fields and general approach. Id rather keep to status quo


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246376376
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +import org.apache.activemq.artemis.utils.collections.PriorityCollection;
    +import org.apache.activemq.artemis.utils.collections.ResettableIterator;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Set;
    +import java.util.Spliterator;
    +import java.util.concurrent.CopyOnWriteArraySet;
    +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    +import java.util.function.Consumer;
    +
    +/**
    + * This class's purpose is to hold the consumers.
    + *
    + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe,
    + * but also lock less for a read path, which is our HOT path.
    + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers.
    + *
    + * There can only be one resettable iterable view,
    + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT
    + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset,
    + *
    + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator().
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsumers<T> {
    +
    +   private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
    +   private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
    +   private final AtomicReferenceFieldUpdater<QueueConsumersImpl, ResettableIterator> changedIteratorFieldUpdater =  AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator");
    +   private volatile ResettableIterator<T> changedIterator;
    +   private ResettableIterator<T> currentIterator = consumers.resettableIterator();
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      return consumers.getPriorites();
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return currentIterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return currentIterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      if (changedIterator != null) {
    +         currentIterator = changedIterator;
    +         changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null);
    +      } else {
    +         currentIterator.reset();
    +      }
    +      return this;
    +   }
    +
    +   @Override
    +   public boolean add(T t) {
    +      boolean result = consumers.add(t);
    +      if (result) {
    +         changedIteratorFieldUpdater.set(this, consumers.resettableIterator());
    +      }
    +      return result;
    +   }
    +
    +   @Override
    +   public boolean remove(T t) {
    +      boolean result = consumers.remove(t);
    +      if (result) {
    +         changedIteratorFieldUpdater.set(this, consumers.resettableIterator());
    --- End diff --
    
    `lazeSet` is enough for single-writer/single-threaded semantic 


---

[GitHub] activemq-artemis issue #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2490
  
    @mochaelandrepearce sadly haven't had much time today to look into it :( 
    Tomorrow I have already scheduled some time in the morning to take a look into this


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246398611
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java ---
    @@ -104,6 +119,11 @@ public void decodeRest(final ActiveMQBuffer buffer) {
           filterString = buffer.readNullableSimpleString();
           browseOnly = buffer.readBoolean();
           requiresResponse = buffer.readBoolean();
    +      if (buffer.readableBytes() > 0) {
    --- End diff --
    
    Does this comment cover to the bit around the safety of always sending the new additional data even to old servers? I can't tell if its covered.
    
    I think it should at the very least be commented what/when the encoding+decoding handling behaviour changed so folks can understand the implications later without heading to find past commits.


---

[GitHub] activemq-artemis issue #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2490
  
    @gemmellr thanks for review, if you could recheck the AMQPSessionCallback for me, to make sure i understood you. 
    
    As for the Openwire test case, this was a simple port over of the existing activemq5 test case as untouched as possible, i agree we could reduce the time but id rather (at least for this release) keep it the same, so we can be sure feature works for openwire same as activemq5.


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246518313
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +import org.apache.activemq.artemis.utils.collections.PriorityCollection;
    +import org.apache.activemq.artemis.utils.collections.ResettableIterator;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Set;
    +import java.util.Spliterator;
    +import java.util.concurrent.CopyOnWriteArraySet;
    +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    +import java.util.function.Consumer;
    +
    +/**
    + * This class's purpose is to hold the consumers.
    + *
    + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe,
    + * but also lock less for a read path, which is our HOT path.
    + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers.
    + *
    + * There can only be one resettable iterable view,
    + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT
    + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset,
    + *
    + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator().
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsumers<T> {
    +
    +   private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
    +   private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
    +   private final AtomicReferenceFieldUpdater<QueueConsumersImpl, ResettableIterator> changedIteratorFieldUpdater =  AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator");
    +   private volatile ResettableIterator<T> changedIterator;
    +   private ResettableIterator<T> currentIterator = consumers.resettableIterator();
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      return consumers.getPriorites();
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return currentIterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return currentIterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      if (changedIterator != null) {
    +         currentIterator = changedIterator;
    +         changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null);
    --- End diff --
    
    If it fails it means whilst we were reseting and changing reference to the new iterator. a consumer was added or removed and another new iterator was created (as this can occur whilsy we are iterating or resetting.) And means simply whilst this reset updated its iterator, on next reset it needs to switch over its again. We wouldnt want to lazy set to null 


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246518747
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +import org.apache.activemq.artemis.utils.collections.PriorityCollection;
    +import org.apache.activemq.artemis.utils.collections.ResettableIterator;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Set;
    +import java.util.Spliterator;
    +import java.util.concurrent.CopyOnWriteArraySet;
    +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    +import java.util.function.Consumer;
    +
    +/**
    + * This class's purpose is to hold the consumers.
    + *
    + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe,
    + * but also lock less for a read path, which is our HOT path.
    + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers.
    + *
    + * There can only be one resettable iterable view,
    + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT
    + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset,
    + *
    + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator().
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsumers<T> {
    +
    +   private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
    +   private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
    +   private final AtomicReferenceFieldUpdater<QueueConsumersImpl, ResettableIterator> changedIteratorFieldUpdater =  AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator");
    +   private volatile ResettableIterator<T> changedIterator;
    +   private ResettableIterator<T> currentIterator = consumers.resettableIterator();
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      return consumers.getPriorites();
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return currentIterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return currentIterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      if (changedIterator != null) {
    --- End diff --
    
    Nice


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246375651
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +import org.apache.activemq.artemis.utils.collections.PriorityCollection;
    +import org.apache.activemq.artemis.utils.collections.ResettableIterator;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Set;
    +import java.util.Spliterator;
    +import java.util.concurrent.CopyOnWriteArraySet;
    +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    +import java.util.function.Consumer;
    +
    +/**
    + * This class's purpose is to hold the consumers.
    + *
    + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe,
    + * but also lock less for a read path, which is our HOT path.
    + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers.
    + *
    + * There can only be one resettable iterable view,
    + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT
    + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset,
    + *
    + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator().
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsumers<T> {
    +
    +   private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
    +   private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
    +   private final AtomicReferenceFieldUpdater<QueueConsumersImpl, ResettableIterator> changedIteratorFieldUpdater =  AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator");
    +   private volatile ResettableIterator<T> changedIterator;
    +   private ResettableIterator<T> currentIterator = consumers.resettableIterator();
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      return consumers.getPriorites();
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return currentIterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return currentIterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      if (changedIterator != null) {
    +         currentIterator = changedIterator;
    +         changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null);
    --- End diff --
    
    What happen if the cas will fail? we ends with a reset() that is not nulling `changedIterator`.
    Given that we are just clearing the `changedIterator` to `null` a `lazySet(this, null) is enough.


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520411
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java ---
    @@ -0,0 +1,319 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.function.Consumer;
    +import java.util.function.Supplier;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the the different collections used for each priority level.
    + *
    + * A supplier is required to provide the underlying collection needed when a new priority level is seen,
    + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics,
    + * if list, then list semantics.
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware.
    + */
    +public class PriorityCollection<T extends PriorityAware> extends AbstractCollection<T> {
    +
    +   private final Supplier<Collection<T>> supplier;
    +   private volatile PriorityHolder<T>[] priorityHolders = newPrioritySetArrayInstance(0);
    +   private volatile int size;
    +
    +   private void setArray(PriorityHolder<T>[] priorityHolders) {
    +      this.priorityHolders = priorityHolders;
    +   }
    +
    +   private PriorityHolder<T>[] getArray() {
    +      return priorityHolders;
    +   }
    +
    +
    +   public PriorityCollection(Supplier<Collection<T>> supplier) {
    +      this.supplier = supplier;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> PriorityHolder<T>[] newPrioritySetArrayInstance(int length) {
    +      return (PriorityHolder<T>[]) Array.newInstance(PriorityHolder.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   public Set<Integer> getPriorites() {
    +      PriorityHolder<T>[] snapshot = getArray();
    +      return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      Iterator<T>[] iterators = getIterators();
    +      return new MultiIterator<>(iterators);
    +   }
    +
    +   private Iterator<T>[] getIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      Iterator<T>[] iterators = newIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = snapshot[i].getValues().iterator();
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Iterator<T>[] newIteratorArrayInstance(int length) {
    +      return (Iterator<T>[]) Array.newInstance(Iterator.class, length);
    +   }
    +
    +   public ResettableIterator<T> resettableIterator() {
    +      return new MultiResettableIterator<T>(getResettableIterators());
    +   }
    +
    +   private ResettableIterator<T>[] getResettableIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      ResettableIterator<T>[] iterators = newResettableIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = new ArrayResettableIterator<>(snapshot[i].getValues().toArray());
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> ResettableIterator<T>[] newResettableIteratorArrayInstance(int length) {
    +      return (ResettableIterator<T>[]) Array.newInstance(ResettableIterator.class, length);
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      PriorityHolder<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].getValues().forEach(action);
    +      }
    +   }
    +
    +   private Collection<T> getCollection(int priority, boolean createIfMissing) {
    +      PriorityHolder<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         PriorityHolder<T> midVal = current[mid];
    +
    +         if (midVal.getPriority() > priority)
    +            low = mid + 1;
    +         else if (midVal.getPriority() < priority)
    +            high = mid - 1;
    +         else
    +            return midVal.getValues(); //key found
    +      }
    +
    +      if (createIfMissing) {
    +         PriorityHolder<T>[] newArray = newPrioritySetArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newArray, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newArray, low + 1, current.length - low);
    +         }
    +         newArray[low] = new PriorityHolder<T>(priority, supplier);
    +         setArray(newArray);
    +         return newArray[low].getValues();
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    +      boolean result = addInternal(t);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean addInternal(T t) {
    +      if (t == null) return false;
    +      Collection<T> priority = getCollection(t.getPriority(), true);
    +      return priority.add(t);
    +   }
    +
    +   @Override
    +   public boolean remove(Object o) {
    +      return o instanceof PriorityAware && remove((PriorityAware) o);
    +   }
    +
    +   public synchronized boolean remove(PriorityAware priorityAware) {
    +      boolean result = removeInternal(priorityAware);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean removeInternal(PriorityAware priorityAware) {
    +      if ( priorityAware == null) return false;
    +      Collection<T> priority = getCollection(priorityAware.getPriority(), false);
    +      boolean result = priority != null && priority.remove(priorityAware);
    +      if (priority != null && priority.size() == 0) {
    +         removeCollection(priorityAware.getPriority());
    +      }
    +      return result;
    +   }
    +
    +   private Collection<T> removeCollection(int priority) {
    +      PriorityHolder<T>[] current = getArray();
    +      int len = current.length;
    +      int low = 0;
    +      int high = len - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    --- End diff --
    
    This is just coding pref for this stuff im happier with shift operations. Its how alot of the jvm classes do this.


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246374340
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +import org.apache.activemq.artemis.utils.collections.PriorityCollection;
    +import org.apache.activemq.artemis.utils.collections.ResettableIterator;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Set;
    +import java.util.Spliterator;
    +import java.util.concurrent.CopyOnWriteArraySet;
    +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    +import java.util.function.Consumer;
    +
    +/**
    + * This class's purpose is to hold the consumers.
    + *
    + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe,
    + * but also lock less for a read path, which is our HOT path.
    + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers.
    + *
    + * There can only be one resettable iterable view,
    + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT
    + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset,
    + *
    + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator().
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsumers<T> {
    +
    +   private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
    +   private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
    +   private final AtomicReferenceFieldUpdater<QueueConsumersImpl, ResettableIterator> changedIteratorFieldUpdater =  AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator");
    +   private volatile ResettableIterator<T> changedIterator;
    +   private ResettableIterator<T> currentIterator = consumers.resettableIterator();
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      return consumers.getPriorites();
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return currentIterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return currentIterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      if (changedIterator != null) {
    --- End diff --
    
    I will just save `changedIterator` once and will perform a logic with a local value instead of volatile load twice


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246525755
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +import org.apache.activemq.artemis.utils.collections.PriorityCollection;
    +import org.apache.activemq.artemis.utils.collections.ResettableIterator;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Set;
    +import java.util.Spliterator;
    +import java.util.concurrent.CopyOnWriteArraySet;
    +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    +import java.util.function.Consumer;
    +
    +/**
    + * This class's purpose is to hold the consumers.
    + *
    + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe,
    + * but also lock less for a read path, which is our HOT path.
    + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers.
    + *
    + * There can only be one resettable iterable view,
    + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT
    + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset,
    + *
    + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator().
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsumers<T> {
    +
    +   private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
    +   private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
    +   private final AtomicReferenceFieldUpdater<QueueConsumersImpl, ResettableIterator> changedIteratorFieldUpdater =  AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator");
    +   private volatile ResettableIterator<T> changedIterator;
    +   private ResettableIterator<T> currentIterator = consumers.resettableIterator();
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      return consumers.getPriorites();
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return currentIterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return currentIterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      if (changedIterator != null) {
    +         currentIterator = changedIterator;
    +         changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null);
    +      } else {
    +         currentIterator.reset();
    --- End diff --
    
    The compare and set is because the new iterator could be created whilst we are mid resetting. The iterator is single thread usage only. Updating the collection should be thread safe and able to occur in parralell.
    
    This is much like iterating copyonwriteareatlist. Iterator works on a snapshot view and this is single thread. Updating the collection itself can be concurrent and occur whilst someone else is iterating 


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246323998
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java ---
    @@ -0,0 +1,44 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +/**
    + * Extends MultiIterator, adding the ability if the underlying iterators are resettable, then its self can reset.
    + * It achieves this by going back to the first iterator, and as moves to another iterator it resets it.
    + *
    + * @param <T> type of the class of the iterator.
    + */
    +public class MultiResettableIterator<T> extends MultiIterator<T> implements ResettableIterator<T> {
    +
    +   public MultiResettableIterator(ResettableIterator<T>[] iterators) {
    +      super(iterators);
    +   }
    +
    +   @Override
    +   protected void moveTo(int index) {
    +      super.moveTo(index);
    +      if (index > -1) {
    +         ((ResettableIterator<T>) get(index)).reset();
    --- End diff --
    
    this cast could be avoided thanks to the changes on generics on `MultiIterator`


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246515810
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---
    @@ -2409,14 +2410,10 @@ private void deliver() {
           // Either the iterator is empty or the consumer is busy
           int noDelivery = 0;
     
    -      int size = 0;
    -
    -      int endPos = -1;
    -
           int handled = 0;
     
           long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
    -
    +      consumers.reset();
    --- End diff --
    
    Its fine. We actually only want to reset on a succesful handled nomatch or expired. Or before starting iterating


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r245968414
  
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.tests.integration.jms.client;
    +
    +import org.apache.activemq.artemis.api.core.RoutingType;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
    +import org.apache.activemq.artemis.tests.util.JMSTestBase;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import javax.jms.Connection;
    +import javax.jms.ConnectionFactory;
    +import javax.jms.Destination;
    +import javax.jms.MessageConsumer;
    +import javax.jms.MessageProducer;
    +import javax.jms.Queue;
    +import javax.jms.Session;
    +import javax.jms.TextMessage;
    +import javax.jms.Topic;
    +
    +/**
    + * Exclusive Test
    + */
    +public class ConsumerPriorityTest extends JMSTestBase {
    +
    +   private SimpleString queueName = SimpleString.toSimpleString("jms.consumer.priority.queue");
    --- End diff --
    
    Rather than hard coding a shared name, using the test name for the queue name is nice as it isolates different tests and makes the relationship clear, sometimes makes it easier to work on issues later with particular tests. There is a test name rule in the parent class, and a getName() method that can be used with it.


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246416127
  
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java ---
    @@ -0,0 +1,144 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.tests.integration.amqp;
    +
    +import org.apache.activemq.transport.amqp.client.AmqpClient;
    +import org.apache.activemq.transport.amqp.client.AmqpConnection;
    +import org.apache.activemq.transport.amqp.client.AmqpMessage;
    +import org.apache.activemq.transport.amqp.client.AmqpReceiver;
    +import org.apache.activemq.transport.amqp.client.AmqpSession;
    +import org.apache.qpid.proton.amqp.Symbol;
    +import org.apache.qpid.proton.amqp.UnsignedInteger;
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Test various behaviors of AMQP receivers with the broker.
    + */
    +public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
    +
    +   @Test(timeout = 30000)
    +   public void testPriority() throws Exception {
    +
    +      AmqpClient client = createAmqpClient();
    +      AmqpConnection connection = addConnection(client.connect());
    +      AmqpSession session = connection.createSession();
    +
    +      Map<Symbol, Object> properties1 = new HashMap<>();
    +      properties1.put(Symbol.getSymbol("priority"), 5);
    +      AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, false, properties1);
    +      receiver1.flow(100);
    +
    +      Map<Symbol, Object> properties2 = new HashMap<>();
    +      properties2.put(Symbol.getSymbol("priority"), 50);
    +      AmqpReceiver receiver2 = session.createReceiver(getQueueName(), null, false, false, properties2);
    +      receiver2.flow(100);
    +
    +      Map<Symbol, Object> properties3 = new HashMap<>();
    +      properties3.put(Symbol.getSymbol("priority"), 10);
    +      AmqpReceiver receiver3 = session.createReceiver(getQueueName(), null, false, false, properties3);
    +      receiver3.flow(100);
    +
    +      sendMessages(getQueueName(), 5);
    +
    +
    +      for (int i = 0; i < 5; i++) {
    +         AmqpMessage message1 = receiver1.receiveNoWait();
    --- End diff --
    
    I believe the test clients receiveNoWait only polls its local queue, so this might now likely sporadically fail due to racing the deliveries. I was only suggesting recieveNoWait as potential initial verification within the loop for non-receiving consumers, to be followed up or substituted by a more stringent final check.


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246323500
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +import java.util.Iterator;
    +
    +/**
    + * Provides an Iterator that works over multiple underlying iterators.
    + *
    + * @param <T> type of the class of the iterator.
    + */
    +public class MultiIterator<T> implements Iterator<T> {
    +
    +   private final Iterator<T>[] iterators;
    +   int index = -1;
    +
    +   public MultiIterator(Iterator<T>[] iterators) {
    --- End diff --
    
    `I[]`


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246323811
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java ---
    @@ -0,0 +1,44 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +/**
    + * Extends MultiIterator, adding the ability if the underlying iterators are resettable, then its self can reset.
    + * It achieves this by going back to the first iterator, and as moves to another iterator it resets it.
    + *
    + * @param <T> type of the class of the iterator.
    + */
    +public class MultiResettableIterator<T> extends MultiIterator<T> implements ResettableIterator<T> {
    --- End diff --
    
    ```java
    public class MultiResettableIterator<T> extends MultiIterator<ResettableIterator<T>, T> implements ResettableIterator<T> {
    ```


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246135697
  
    --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---
    @@ -233,6 +239,11 @@ public Object createSender(ProtonServerSenderContext protonSender,
           return consumer;
        }
     
    +   private int getPriority(Map<Symbol, Object> properties) {
    +      Integer value = properties == null ? null : (Integer) properties.get(PRIORITY);
    --- End diff --
    
    Have changed if you can recheck to make sure i understood.


---

[GitHub] activemq-artemis issue #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2490
  
    @michaelandrepearce Nice! Will take a look today or max tomorrow :+1: 


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246555040
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +import java.util.Iterator;
    +
    +/**
    + * Provides an Iterator that works over multiple underlying iterators.
    + *
    + * @param <T> type of the class of the iterator.
    + */
    +public class MultiIterator<T> implements Iterator<T> {
    --- End diff --
    
    OK, so to do this and keep the cleaner user interface what i can do is make an abstract base which has most of this logic in, and have the more ugly generics. And then make MultiIterator extend that which then keeps its cleaner generics, and like wise MultiResettableIterator. Ill update PR to see if this is good compromise, or if you'd prefer me to revert and simply keep the cast i had originally.


---

[GitHub] activemq-artemis issue #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2490
  
    @franz1981 great thanks a million :)


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246128143
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java ---
    @@ -104,6 +119,11 @@ public void decodeRest(final ActiveMQBuffer buffer) {
           filterString = buffer.readNullableSimpleString();
           browseOnly = buffer.readBoolean();
           requiresResponse = buffer.readBoolean();
    +      if (buffer.readableBytes() > 0) {
    --- End diff --
    
    This is typical pattern used for adding safely a new field that can be either nullable or defaultable. Used many times over.


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246337126
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java ---
    @@ -0,0 +1,319 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.function.Consumer;
    +import java.util.function.Supplier;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the the different collections used for each priority level.
    + *
    + * A supplier is required to provide the underlying collection needed when a new priority level is seen,
    + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics,
    + * if list, then list semantics.
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware.
    + */
    +public class PriorityCollection<T extends PriorityAware> extends AbstractCollection<T> {
    +
    +   private final Supplier<Collection<T>> supplier;
    +   private volatile PriorityHolder<T>[] priorityHolders = newPrioritySetArrayInstance(0);
    +   private volatile int size;
    +
    +   private void setArray(PriorityHolder<T>[] priorityHolders) {
    +      this.priorityHolders = priorityHolders;
    +   }
    +
    +   private PriorityHolder<T>[] getArray() {
    +      return priorityHolders;
    +   }
    +
    +
    +   public PriorityCollection(Supplier<Collection<T>> supplier) {
    +      this.supplier = supplier;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> PriorityHolder<T>[] newPrioritySetArrayInstance(int length) {
    +      return (PriorityHolder<T>[]) Array.newInstance(PriorityHolder.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   public Set<Integer> getPriorites() {
    +      PriorityHolder<T>[] snapshot = getArray();
    +      return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      Iterator<T>[] iterators = getIterators();
    +      return new MultiIterator<>(iterators);
    +   }
    +
    +   private Iterator<T>[] getIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      Iterator<T>[] iterators = newIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = snapshot[i].getValues().iterator();
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Iterator<T>[] newIteratorArrayInstance(int length) {
    +      return (Iterator<T>[]) Array.newInstance(Iterator.class, length);
    +   }
    +
    +   public ResettableIterator<T> resettableIterator() {
    +      return new MultiResettableIterator<T>(getResettableIterators());
    +   }
    +
    +   private ResettableIterator<T>[] getResettableIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      ResettableIterator<T>[] iterators = newResettableIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = new ArrayResettableIterator<>(snapshot[i].getValues().toArray());
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> ResettableIterator<T>[] newResettableIteratorArrayInstance(int length) {
    +      return (ResettableIterator<T>[]) Array.newInstance(ResettableIterator.class, length);
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      PriorityHolder<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].getValues().forEach(action);
    +      }
    +   }
    +
    +   private Collection<T> getCollection(int priority, boolean createIfMissing) {
    +      PriorityHolder<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         PriorityHolder<T> midVal = current[mid];
    +
    +         if (midVal.getPriority() > priority)
    +            low = mid + 1;
    +         else if (midVal.getPriority() < priority)
    +            high = mid - 1;
    +         else
    +            return midVal.getValues(); //key found
    +      }
    +
    +      if (createIfMissing) {
    +         PriorityHolder<T>[] newArray = newPrioritySetArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newArray, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newArray, low + 1, current.length - low);
    +         }
    +         newArray[low] = new PriorityHolder<T>(priority, supplier);
    +         setArray(newArray);
    +         return newArray[low].getValues();
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    +      boolean result = addInternal(t);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean addInternal(T t) {
    +      if (t == null) return false;
    +      Collection<T> priority = getCollection(t.getPriority(), true);
    +      return priority.add(t);
    +   }
    +
    +   @Override
    +   public boolean remove(Object o) {
    +      return o instanceof PriorityAware && remove((PriorityAware) o);
    +   }
    +
    +   public synchronized boolean remove(PriorityAware priorityAware) {
    +      boolean result = removeInternal(priorityAware);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean removeInternal(PriorityAware priorityAware) {
    +      if ( priorityAware == null) return false;
    +      Collection<T> priority = getCollection(priorityAware.getPriority(), false);
    +      boolean result = priority != null && priority.remove(priorityAware);
    +      if (priority != null && priority.size() == 0) {
    +         removeCollection(priorityAware.getPriority());
    +      }
    +      return result;
    +   }
    +
    +   private Collection<T> removeCollection(int priority) {
    +      PriorityHolder<T>[] current = getArray();
    +      int len = current.length;
    +      int low = 0;
    +      int high = len - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    --- End diff --
    
    can use just `/2`: the compiler is smart enough to use the shift :+1:  


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246523218
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +import java.util.Iterator;
    +
    +/**
    + * Provides an Iterator that works over multiple underlying iterators.
    + *
    + * @param <T> type of the class of the iterator.
    + */
    +public class MultiIterator<T> implements Iterator<T> {
    +
    +   private final Iterator<T>[] iterators;
    +   int index = -1;
    --- End diff --
    
    Agreed


---

[GitHub] activemq-artemis issue #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2490
  
    @gemmellr ok will reduce back down the extra tests i added. To those you suggest. 
    
    Also re activemq5. Fair enough, ill reduce / alter this then.


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246515455
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---
    @@ -2497,45 +2484,36 @@ private void deliver() {
     
     
                       handled++;
    -
    +                  consumers.reset();
                       continue;
                    }
     
                    if (logger.isTraceEnabled()) {
                       logger.trace("Queue " + this.getName() + " is delivering reference " + ref);
                    }
     
    -               // If a group id is set, then this overrides the consumer chosen round-robin
    -
    -               SimpleString groupID = extractGroupID(ref);
    -
    -               if (groupID != null) {
    -                  groupConsumer = groups.get(groupID);
    +               final SimpleString groupID = extractGroupID(ref);
    +               groupConsumer = getGroupConsumer(groupConsumer, groupID);
     
    -                  if (groupConsumer != null) {
    -                     consumer = groupConsumer;
    -                  }
    -               }
    -
    -               if (exclusive && redistributor == null) {
    -                  consumer = consumerList.get(0).consumer;
    +               if (groupConsumer != null) {
    +                  consumer = groupConsumer;
                    }
     
                    HandleStatus status = handle(ref, consumer);
     
                    if (status == HandleStatus.HANDLED) {
     
    -                  deliveriesInTransit.countUp();
    -
    -                  handledconsumer = consumer;
    -
    -                  removeMessageReference(holder, ref);
    -
                       if (redistributor == null) {
                          handleMessageGroup(ref, consumer, groupConsumer, groupID);
                       }
     
    +                  deliveriesInTransit.countUp();
    +
    +
    +                  removeMessageReference(holder, ref);
    +                  handledconsumer = consumer;
                       handled++;
    +                  consumers.reset();
    --- End diff --
    
    Yes its fine. Would have no negative effect, and actually have same behaviour as old. 


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246513536
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java ---
    @@ -0,0 +1,319 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.function.Consumer;
    +import java.util.function.Supplier;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the the different collections used for each priority level.
    + *
    + * A supplier is required to provide the underlying collection needed when a new priority level is seen,
    + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics,
    + * if list, then list semantics.
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware.
    + */
    +public class PriorityCollection<T extends PriorityAware> extends AbstractCollection<T> {
    +
    +   private final Supplier<Collection<T>> supplier;
    +   private volatile PriorityHolder<T>[] priorityHolders = newPrioritySetArrayInstance(0);
    +   private volatile int size;
    +
    +   private void setArray(PriorityHolder<T>[] priorityHolders) {
    +      this.priorityHolders = priorityHolders;
    +   }
    +
    +   private PriorityHolder<T>[] getArray() {
    +      return priorityHolders;
    +   }
    +
    +
    +   public PriorityCollection(Supplier<Collection<T>> supplier) {
    +      this.supplier = supplier;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> PriorityHolder<T>[] newPrioritySetArrayInstance(int length) {
    +      return (PriorityHolder<T>[]) Array.newInstance(PriorityHolder.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   public Set<Integer> getPriorites() {
    +      PriorityHolder<T>[] snapshot = getArray();
    +      return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      Iterator<T>[] iterators = getIterators();
    +      return new MultiIterator<>(iterators);
    +   }
    +
    +   private Iterator<T>[] getIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      Iterator<T>[] iterators = newIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = snapshot[i].getValues().iterator();
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Iterator<T>[] newIteratorArrayInstance(int length) {
    +      return (Iterator<T>[]) Array.newInstance(Iterator.class, length);
    +   }
    +
    +   public ResettableIterator<T> resettableIterator() {
    +      return new MultiResettableIterator<T>(getResettableIterators());
    +   }
    +
    +   private ResettableIterator<T>[] getResettableIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      ResettableIterator<T>[] iterators = newResettableIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = new ArrayResettableIterator<>(snapshot[i].getValues().toArray());
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> ResettableIterator<T>[] newResettableIteratorArrayInstance(int length) {
    +      return (ResettableIterator<T>[]) Array.newInstance(ResettableIterator.class, length);
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      PriorityHolder<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].getValues().forEach(action);
    +      }
    +   }
    +
    +   private Collection<T> getCollection(int priority, boolean createIfMissing) {
    +      PriorityHolder<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         PriorityHolder<T> midVal = current[mid];
    +
    +         if (midVal.getPriority() > priority)
    +            low = mid + 1;
    +         else if (midVal.getPriority() < priority)
    +            high = mid - 1;
    +         else
    +            return midVal.getValues(); //key found
    +      }
    +
    +      if (createIfMissing) {
    +         PriorityHolder<T>[] newArray = newPrioritySetArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newArray, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newArray, low + 1, current.length - low);
    +         }
    +         newArray[low] = new PriorityHolder<T>(priority, supplier);
    +         setArray(newArray);
    +         return newArray[low].getValues();
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    --- End diff --
    
    This needs to be concurrent safe on modifications


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246382311
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java ---
    @@ -0,0 +1,319 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.function.Consumer;
    +import java.util.function.Supplier;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the the different collections used for each priority level.
    + *
    + * A supplier is required to provide the underlying collection needed when a new priority level is seen,
    + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics,
    + * if list, then list semantics.
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware.
    + */
    +public class PriorityCollection<T extends PriorityAware> extends AbstractCollection<T> {
    +
    +   private final Supplier<Collection<T>> supplier;
    +   private volatile PriorityHolder<T>[] priorityHolders = newPrioritySetArrayInstance(0);
    +   private volatile int size;
    +
    +   private void setArray(PriorityHolder<T>[] priorityHolders) {
    +      this.priorityHolders = priorityHolders;
    +   }
    +
    +   private PriorityHolder<T>[] getArray() {
    +      return priorityHolders;
    +   }
    +
    +
    +   public PriorityCollection(Supplier<Collection<T>> supplier) {
    +      this.supplier = supplier;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> PriorityHolder<T>[] newPrioritySetArrayInstance(int length) {
    +      return (PriorityHolder<T>[]) Array.newInstance(PriorityHolder.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   public Set<Integer> getPriorites() {
    +      PriorityHolder<T>[] snapshot = getArray();
    +      return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      Iterator<T>[] iterators = getIterators();
    +      return new MultiIterator<>(iterators);
    +   }
    +
    +   private Iterator<T>[] getIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      Iterator<T>[] iterators = newIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = snapshot[i].getValues().iterator();
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Iterator<T>[] newIteratorArrayInstance(int length) {
    +      return (Iterator<T>[]) Array.newInstance(Iterator.class, length);
    +   }
    +
    +   public ResettableIterator<T> resettableIterator() {
    +      return new MultiResettableIterator<T>(getResettableIterators());
    +   }
    +
    +   private ResettableIterator<T>[] getResettableIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      ResettableIterator<T>[] iterators = newResettableIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = new ArrayResettableIterator<>(snapshot[i].getValues().toArray());
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> ResettableIterator<T>[] newResettableIteratorArrayInstance(int length) {
    +      return (ResettableIterator<T>[]) Array.newInstance(ResettableIterator.class, length);
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      PriorityHolder<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].getValues().forEach(action);
    +      }
    +   }
    +
    +   private Collection<T> getCollection(int priority, boolean createIfMissing) {
    +      PriorityHolder<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         PriorityHolder<T> midVal = current[mid];
    +
    +         if (midVal.getPriority() > priority)
    +            low = mid + 1;
    +         else if (midVal.getPriority() < priority)
    +            high = mid - 1;
    +         else
    +            return midVal.getValues(); //key found
    +      }
    +
    +      if (createIfMissing) {
    +         PriorityHolder<T>[] newArray = newPrioritySetArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newArray, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newArray, low + 1, current.length - low);
    +         }
    +         newArray[low] = new PriorityHolder<T>(priority, supplier);
    +         setArray(newArray);
    +         return newArray[low].getValues();
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    --- End diff --
    
    Looking to the usage on `QueueImpl` it seems that is being always called into a `synchronized (this) {` block ie it doesn't need to be `synchronized` and same same for `remove`. 
    If we allow `snapshotting` from a different thread is fine, but I would document it instead.
    I know that's a less safe approach than putting `sychronized` all around, but I do believe that we would have to do the same on  `LinkedListImpl` for this same reason if we need to `protect` correct usage at any cost.


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r245953999
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java ---
    @@ -52,6 +57,7 @@ public String toString() {
           StringBuffer buff = new StringBuffer(getParentString());
           buff.append(", queueName=" + queueName);
           buff.append(", filterString=" + filterString);
    +      buff.append(", priority=" + priority);
    --- End diff --
    
    Nitpicking, the other details seem to be emitted 'in order' relative to the buffer content, so would it make sense to put this at the end consistent with its location?


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246132135
  
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java ---
    @@ -0,0 +1,65 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.tests.integration.openwire.amq;
    +
    +import javax.jms.JMSException;
    +import javax.jms.Message;
    +import javax.jms.MessageConsumer;
    +import javax.jms.MessageProducer;
    +import javax.jms.Session;
    +
    +
    +import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
    +import org.apache.activemq.command.ActiveMQQueue;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +public class QueueConsumerPriorityTest extends BasicOpenWireTest {
    +
    +
    +   @Override
    +   @Before
    +   public void setUp() throws Exception {
    +      super.setUp();
    +      this.makeSureCoreQueueExist("QUEUE.A");
    +   }
    +   @Test
    +   public void testQueueConsumerPriority() throws JMSException, InterruptedException {
    +      connection.start();
    +      Session consumerLowPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    +      Session consumerHighPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    +      assertNotNull(consumerHighPriority);
    +      Session senderSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    +      String queueName = "QUEUE.A";
    +      ActiveMQQueue low = new ActiveMQQueue(queueName + "?consumer.priority=1");
    +      MessageConsumer lowConsumer = consumerLowPriority.createConsumer(low);
    +
    +      ActiveMQQueue high = new ActiveMQQueue(queueName + "?consumer.priority=2");
    +      MessageConsumer highConsumer = consumerLowPriority.createConsumer(high);
    +
    +      ActiveMQQueue senderQueue = new ActiveMQQueue(queueName);
    +
    +      MessageProducer producer = senderSession.createProducer(senderQueue);
    +
    +      Message msg = senderSession.createTextMessage("test");
    +      for (int i = 0; i < 1000; i++) {
    +         producer.send(msg);
    +         assertNotNull("null on iteration: " + i, highConsumer.receive(1000));
    +      }
    +      assertNull(lowConsumer.receive(2000));
    --- End diff --
    
    this is the original test from ActiveMQ5 i was trying to keep this test as much un-touched as possible to ensure behavior is the same.


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246557671
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java ---
    @@ -0,0 +1,319 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.function.Consumer;
    +import java.util.function.Supplier;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the the different collections used for each priority level.
    + *
    + * A supplier is required to provide the underlying collection needed when a new priority level is seen,
    + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics,
    + * if list, then list semantics.
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware.
    + */
    +public class PriorityCollection<T extends PriorityAware> extends AbstractCollection<T> {
    +
    +   private final Supplier<Collection<T>> supplier;
    +   private volatile PriorityHolder<T>[] priorityHolders = newPrioritySetArrayInstance(0);
    +   private volatile int size;
    +
    +   private void setArray(PriorityHolder<T>[] priorityHolders) {
    +      this.priorityHolders = priorityHolders;
    +   }
    +
    +   private PriorityHolder<T>[] getArray() {
    +      return priorityHolders;
    +   }
    +
    +
    +   public PriorityCollection(Supplier<Collection<T>> supplier) {
    +      this.supplier = supplier;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> PriorityHolder<T>[] newPrioritySetArrayInstance(int length) {
    +      return (PriorityHolder<T>[]) Array.newInstance(PriorityHolder.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   public Set<Integer> getPriorites() {
    +      PriorityHolder<T>[] snapshot = getArray();
    +      return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      Iterator<T>[] iterators = getIterators();
    +      return new MultiIterator<>(iterators);
    +   }
    +
    +   private Iterator<T>[] getIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      Iterator<T>[] iterators = newIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = snapshot[i].getValues().iterator();
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Iterator<T>[] newIteratorArrayInstance(int length) {
    +      return (Iterator<T>[]) Array.newInstance(Iterator.class, length);
    +   }
    +
    +   public ResettableIterator<T> resettableIterator() {
    +      return new MultiResettableIterator<T>(getResettableIterators());
    +   }
    +
    +   private ResettableIterator<T>[] getResettableIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      ResettableIterator<T>[] iterators = newResettableIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = new ArrayResettableIterator<>(snapshot[i].getValues().toArray());
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> ResettableIterator<T>[] newResettableIteratorArrayInstance(int length) {
    +      return (ResettableIterator<T>[]) Array.newInstance(ResettableIterator.class, length);
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      PriorityHolder<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].getValues().forEach(action);
    +      }
    +   }
    +
    +   private Collection<T> getCollection(int priority, boolean createIfMissing) {
    +      PriorityHolder<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         PriorityHolder<T> midVal = current[mid];
    +
    +         if (midVal.getPriority() > priority)
    +            low = mid + 1;
    +         else if (midVal.getPriority() < priority)
    +            high = mid - 1;
    +         else
    +            return midVal.getValues(); //key found
    +      }
    +
    +      if (createIfMissing) {
    +         PriorityHolder<T>[] newArray = newPrioritySetArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newArray, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newArray, low + 1, current.length - low);
    +         }
    +         newArray[low] = new PriorityHolder<T>(priority, supplier);
    +         setArray(newArray);
    +         return newArray[low].getValues();
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    +      boolean result = addInternal(t);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean addInternal(T t) {
    +      if (t == null) return false;
    +      Collection<T> priority = getCollection(t.getPriority(), true);
    +      return priority.add(t);
    +   }
    +
    +   @Override
    +   public boolean remove(Object o) {
    +      return o instanceof PriorityAware && remove((PriorityAware) o);
    +   }
    +
    +   public synchronized boolean remove(PriorityAware priorityAware) {
    +      boolean result = removeInternal(priorityAware);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean removeInternal(PriorityAware priorityAware) {
    +      if ( priorityAware == null) return false;
    +      Collection<T> priority = getCollection(priorityAware.getPriority(), false);
    +      boolean result = priority != null && priority.remove(priorityAware);
    +      if (priority != null && priority.size() == 0) {
    +         removeCollection(priorityAware.getPriority());
    +      }
    +      return result;
    +   }
    +
    +   private Collection<T> removeCollection(int priority) {
    +      PriorityHolder<T>[] current = getArray();
    +      int len = current.length;
    +      int low = 0;
    +      int high = len - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    --- End diff --
    
    I know but the reason is because some optimisations were not present in old JDK version and the classes aren't changed...


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246127864
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java ---
    @@ -52,6 +57,7 @@ public String toString() {
           StringBuffer buff = new StringBuffer(getParentString());
           buff.append(", queueName=" + queueName);
           buff.append(", filterString=" + filterString);
    +      buff.append(", priority=" + priority);
    --- End diff --
    
    Makes sense


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r245972322
  
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java ---
    @@ -0,0 +1,65 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.tests.integration.openwire.amq;
    +
    +import javax.jms.JMSException;
    +import javax.jms.Message;
    +import javax.jms.MessageConsumer;
    +import javax.jms.MessageProducer;
    +import javax.jms.Session;
    +
    +
    +import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
    +import org.apache.activemq.command.ActiveMQQueue;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +public class QueueConsumerPriorityTest extends BasicOpenWireTest {
    +
    +
    +   @Override
    +   @Before
    +   public void setUp() throws Exception {
    +      super.setUp();
    +      this.makeSureCoreQueueExist("QUEUE.A");
    +   }
    +   @Test
    +   public void testQueueConsumerPriority() throws JMSException, InterruptedException {
    +      connection.start();
    +      Session consumerLowPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    +      Session consumerHighPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    +      assertNotNull(consumerHighPriority);
    +      Session senderSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    +      String queueName = "QUEUE.A";
    +      ActiveMQQueue low = new ActiveMQQueue(queueName + "?consumer.priority=1");
    +      MessageConsumer lowConsumer = consumerLowPriority.createConsumer(low);
    +
    +      ActiveMQQueue high = new ActiveMQQueue(queueName + "?consumer.priority=2");
    +      MessageConsumer highConsumer = consumerLowPriority.createConsumer(high);
    +
    +      ActiveMQQueue senderQueue = new ActiveMQQueue(queueName);
    +
    +      MessageProducer producer = senderSession.createProducer(senderQueue);
    +
    +      Message msg = senderSession.createTextMessage("test");
    +      for (int i = 0; i < 1000; i++) {
    +         producer.send(msg);
    +         assertNotNull("null on iteration: " + i, highConsumer.receive(1000));
    +      }
    +      assertNull(lowConsumer.receive(2000));
    --- End diff --
    
    Would a receiveNoWait (either in or outside the loop) like the other tests be nicer than burning 2 seconds? Slow tests is a key reason eventually noone wants to runs the tests :)


---

[GitHub] activemq-artemis issue #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2490
  
    @franz1981 just ignore class comments, theyre the originals still, ill need to change, but wanted to get to you quickly so you have chance to look over. If you think this is better ill make final tidyup bits, such as class comments and replace the real PR's branch. 


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246133832
  
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.tests.integration.jms.client;
    +
    +import org.apache.activemq.artemis.api.core.RoutingType;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
    +import org.apache.activemq.artemis.tests.util.JMSTestBase;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import javax.jms.Connection;
    +import javax.jms.ConnectionFactory;
    +import javax.jms.Destination;
    +import javax.jms.MessageConsumer;
    +import javax.jms.MessageProducer;
    +import javax.jms.Queue;
    +import javax.jms.Session;
    +import javax.jms.TextMessage;
    +import javax.jms.Topic;
    +
    +/**
    + * Exclusive Test
    + */
    +public class ConsumerPriorityTest extends JMSTestBase {
    +
    +   private SimpleString queueName = SimpleString.toSimpleString("jms.consumer.priority.queue");
    --- End diff --
    
    Nice i didnt know about that in the parent class. will change to use this..


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246140214
  
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.tests.integration.amqp;
    +
    +import org.apache.activemq.transport.amqp.client.AmqpClient;
    +import org.apache.activemq.transport.amqp.client.AmqpConnection;
    +import org.apache.activemq.transport.amqp.client.AmqpMessage;
    +import org.apache.activemq.transport.amqp.client.AmqpReceiver;
    +import org.apache.activemq.transport.amqp.client.AmqpSession;
    +import org.apache.qpid.proton.amqp.Symbol;
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Test various behaviors of AMQP receivers with the broker.
    + */
    +public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
    +
    +   @Test(timeout = 30000)
    +   public void testPriority() throws Exception {
    +
    +      AmqpClient client = createAmqpClient();
    +      AmqpConnection connection = addConnection(client.connect());
    +      AmqpSession session = connection.createSession();
    +
    +      Map<Symbol, Object> properties1 = new HashMap<>();
    +      properties1.put(Symbol.getSymbol("priority"), 50);
    --- End diff --
    
    changed up the ordering in this test also.


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246558298
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +import org.apache.activemq.artemis.utils.collections.PriorityCollection;
    +import org.apache.activemq.artemis.utils.collections.ResettableIterator;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Set;
    +import java.util.Spliterator;
    +import java.util.concurrent.CopyOnWriteArraySet;
    +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    +import java.util.function.Consumer;
    +
    +/**
    + * This class's purpose is to hold the consumers.
    + *
    + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe,
    + * but also lock less for a read path, which is our HOT path.
    + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers.
    + *
    + * There can only be one resettable iterable view,
    + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT
    + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset,
    + *
    + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator().
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsumers<T> {
    +
    +   private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
    +   private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
    +   private final AtomicReferenceFieldUpdater<QueueConsumersImpl, ResettableIterator> changedIteratorFieldUpdater =  AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator");
    +   private volatile ResettableIterator<T> changedIterator;
    +   private ResettableIterator<T> currentIterator = consumers.resettableIterator();
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      return consumers.getPriorites();
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return currentIterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return currentIterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      if (changedIterator != null) {
    +         currentIterator = changedIterator;
    +         changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null);
    --- End diff --
    
    currentIterator is ONLY updated by reset, which should ONLY be called by the same threads operating on the ResettableIterator interface.
    
    When add or remove of consumer occurs a new iterator is parked into the volatile changedIterator (using an atomic field updater), so the next reset can pick it up. 



---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r245965929
  
    --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---
    @@ -233,6 +239,11 @@ public Object createSender(ProtonServerSenderContext protonSender,
           return consumer;
        }
     
    +   private int getPriority(Map<Symbol, Object> properties) {
    +      Integer value = properties == null ? null : (Integer) properties.get(PRIORITY);
    --- End diff --
    
    Comments on the original #2488 PR suggest you want to align with Qpid Broker-J in this area. Its support (and the accompanying documentation lift) notes as an integral value, so the value here is not necessarily going to be the Integer type.


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246344280
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java ---
    @@ -84,6 +94,10 @@ public void setFilterString(SimpleString filterString) {
           this.filterString = filterString;
        }
     
    +   public void setPriority(byte priority) {
    --- End diff --
    
    this modifier is needed? I can't see it to be called by anyone: if not `priority` filed could be declared as `final`


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246551345
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +import org.apache.activemq.artemis.utils.collections.PriorityCollection;
    +import org.apache.activemq.artemis.utils.collections.ResettableIterator;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Set;
    +import java.util.Spliterator;
    +import java.util.concurrent.CopyOnWriteArraySet;
    +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    +import java.util.function.Consumer;
    +
    +/**
    + * This class's purpose is to hold the consumers.
    + *
    + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe,
    + * but also lock less for a read path, which is our HOT path.
    + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers.
    + *
    + * There can only be one resettable iterable view,
    + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT
    + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset,
    + *
    + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator().
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsumers<T> {
    +
    +   private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
    +   private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
    +   private final AtomicReferenceFieldUpdater<QueueConsumersImpl, ResettableIterator> changedIteratorFieldUpdater =  AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator");
    +   private volatile ResettableIterator<T> changedIterator;
    +   private ResettableIterator<T> currentIterator = consumers.resettableIterator();
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      return consumers.getPriorites();
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return currentIterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return currentIterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      if (changedIterator != null) {
    +         currentIterator = changedIterator;
    +         changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null);
    --- End diff --
    
    Tomorrow I will give another look but it doesn't seems correct to me if this code can be run by different threads: currentIterator is not guarded by any atomic operations to be safely published and changedIterator is volatile load 3 times before hitting the case...


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246383235
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java ---
    @@ -113,6 +133,7 @@ public int hashCode() {
           result = prime * result + (browseOnly ? 1231 : 1237);
           result = prime * result + ((filterString == null) ? 0 : filterString.hashCode());
           result = prime * result + (int) (id ^ (id >>> 32));
    +      result = prime * result + priority;
    --- End diff --
    
    I would uses `Integer::hashCode(priority)` that would `agitate` the value 


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520110
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +import org.apache.activemq.artemis.utils.collections.PriorityCollection;
    +import org.apache.activemq.artemis.utils.collections.ResettableIterator;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Set;
    +import java.util.Spliterator;
    +import java.util.concurrent.CopyOnWriteArraySet;
    +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    +import java.util.function.Consumer;
    +
    +/**
    + * This class's purpose is to hold the consumers.
    + *
    + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe,
    + * but also lock less for a read path, which is our HOT path.
    + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers.
    + *
    + * There can only be one resettable iterable view,
    + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT
    + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset,
    + *
    + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator().
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsumers<T> {
    +
    +   private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
    +   private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
    +   private final AtomicReferenceFieldUpdater<QueueConsumersImpl, ResettableIterator> changedIteratorFieldUpdater =  AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator");
    +   private volatile ResettableIterator<T> changedIterator;
    +   private ResettableIterator<T> currentIterator = consumers.resettableIterator();
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      return consumers.getPriorites();
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return currentIterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return currentIterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      if (changedIterator != null) {
    +         currentIterator = changedIterator;
    +         changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null);
    +      } else {
    +         currentIterator.reset();
    --- End diff --
    
    So it is a good reason to save using a `compareAndSet` that's not only expensive but give the false illusion that the code is thread-safe


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246321398
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +import java.util.Iterator;
    +
    +/**
    + * Provides an Iterator that works over multiple underlying iterators.
    + *
    + * @param <T> type of the class of the iterator.
    + */
    +public class MultiIterator<T> implements Iterator<T> {
    --- End diff --
    
    `MultiIterator<I extends Iterator<T>, T> implements Iterator<T>`
    It should avoid casting  on children while manipulating `iterators` type


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246338675
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java ---
    @@ -0,0 +1,319 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.function.Consumer;
    +import java.util.function.Supplier;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the the different collections used for each priority level.
    + *
    + * A supplier is required to provide the underlying collection needed when a new priority level is seen,
    + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics,
    + * if list, then list semantics.
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware.
    + */
    +public class PriorityCollection<T extends PriorityAware> extends AbstractCollection<T> {
    +
    +   private final Supplier<Collection<T>> supplier;
    +   private volatile PriorityHolder<T>[] priorityHolders = newPrioritySetArrayInstance(0);
    +   private volatile int size;
    +
    +   private void setArray(PriorityHolder<T>[] priorityHolders) {
    +      this.priorityHolders = priorityHolders;
    +   }
    +
    +   private PriorityHolder<T>[] getArray() {
    +      return priorityHolders;
    +   }
    +
    +
    +   public PriorityCollection(Supplier<Collection<T>> supplier) {
    +      this.supplier = supplier;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> PriorityHolder<T>[] newPrioritySetArrayInstance(int length) {
    +      return (PriorityHolder<T>[]) Array.newInstance(PriorityHolder.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   public Set<Integer> getPriorites() {
    +      PriorityHolder<T>[] snapshot = getArray();
    +      return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      Iterator<T>[] iterators = getIterators();
    +      return new MultiIterator<>(iterators);
    +   }
    +
    +   private Iterator<T>[] getIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      Iterator<T>[] iterators = newIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = snapshot[i].getValues().iterator();
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Iterator<T>[] newIteratorArrayInstance(int length) {
    +      return (Iterator<T>[]) Array.newInstance(Iterator.class, length);
    +   }
    +
    +   public ResettableIterator<T> resettableIterator() {
    +      return new MultiResettableIterator<T>(getResettableIterators());
    +   }
    +
    +   private ResettableIterator<T>[] getResettableIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      ResettableIterator<T>[] iterators = newResettableIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = new ArrayResettableIterator<>(snapshot[i].getValues().toArray());
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> ResettableIterator<T>[] newResettableIteratorArrayInstance(int length) {
    +      return (ResettableIterator<T>[]) Array.newInstance(ResettableIterator.class, length);
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      PriorityHolder<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].getValues().forEach(action);
    +      }
    +   }
    +
    +   private Collection<T> getCollection(int priority, boolean createIfMissing) {
    +      PriorityHolder<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         PriorityHolder<T> midVal = current[mid];
    +
    +         if (midVal.getPriority() > priority)
    +            low = mid + 1;
    +         else if (midVal.getPriority() < priority)
    +            high = mid - 1;
    +         else
    +            return midVal.getValues(); //key found
    +      }
    +
    +      if (createIfMissing) {
    +         PriorityHolder<T>[] newArray = newPrioritySetArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newArray, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newArray, low + 1, current.length - low);
    +         }
    +         newArray[low] = new PriorityHolder<T>(priority, supplier);
    +         setArray(newArray);
    +         return newArray[low].getValues();
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    +      boolean result = addInternal(t);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean addInternal(T t) {
    +      if (t == null) return false;
    +      Collection<T> priority = getCollection(t.getPriority(), true);
    +      return priority.add(t);
    +   }
    +
    +   @Override
    +   public boolean remove(Object o) {
    +      return o instanceof PriorityAware && remove((PriorityAware) o);
    +   }
    +
    +   public synchronized boolean remove(PriorityAware priorityAware) {
    +      boolean result = removeInternal(priorityAware);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean removeInternal(PriorityAware priorityAware) {
    +      if ( priorityAware == null) return false;
    +      Collection<T> priority = getCollection(priorityAware.getPriority(), false);
    +      boolean result = priority != null && priority.remove(priorityAware);
    +      if (priority != null && priority.size() == 0) {
    +         removeCollection(priorityAware.getPriority());
    +      }
    +      return result;
    +   }
    +
    +   private Collection<T> removeCollection(int priority) {
    +      PriorityHolder<T>[] current = getArray();
    +      int len = current.length;
    +      int low = 0;
    +      int high = len - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         PriorityHolder<T> midVal = current[mid];
    +
    +         if (midVal.getPriority() > priority)
    +            low = mid + 1;
    +         else if (midVal.getPriority() < priority)
    +            high = mid - 1;
    +         else {
    +            PriorityHolder<T>[] newArray = newPrioritySetArrayInstance(len - 1);
    +            System.arraycopy(current, 0, newArray, 0, mid);
    +            System.arraycopy(current, mid + 1, newArray, mid, len - mid - 1);
    +            setArray(newArray);
    +            return midVal.getValues(); //key found
    +         }
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public boolean containsAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      for (Object e : c)
    +         if (!contains(e))
    +            return false;
    +      return true;
    +   }
    +
    +   @Override
    +   public synchronized boolean addAll(Collection<? extends T> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      for (T e : c)
    +         if (addInternal(e))
    +            modified = true;
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized boolean removeAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      for (Object o : c) {
    +         if (remove(o)) {
    +            modified = true;
    +         }
    +      }
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized boolean retainAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      PriorityHolder<T>[] snapshot = getArray();
    +      for (PriorityHolder<T> priorityHolder : snapshot) {
    +         if (priorityHolder.getValues().retainAll(c)) {
    +            modified = true;
    +         }
    +      }
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized void clear() {
    +      PriorityHolder<T>[] snapshot = getArray();
    +      for (PriorityHolder<T> priorityHolder : snapshot) {
    +         priorityHolder.getValues().clear();
    +      }
    +      calcSize();
    +   }
    +
    +   @Override
    +   public boolean contains(Object o) {
    +      return o instanceof PriorityAware && contains((PriorityAware) o);
    +   }
    +
    +   public boolean contains(PriorityAware priorityAware) {
    +      if (priorityAware == null) return false;
    +      Collection<T> prioritySet = getCollection(priorityAware.getPriority(), false);
    +      return prioritySet != null && prioritySet.contains(priorityAware);
    +   }
    +
    +   private void calcSize() {
    +      PriorityHolder<T>[] current = getArray();
    +      int size = 0;
    +      for (PriorityHolder<T> priorityHolder : current) {
    +         size += priorityHolder.getValues().size();
    --- End diff --
    
    `size` can be > `Intever.MAX_VALUE`?
    I know that in the original code wasn't handled, it is just a remind that maybe we need to consider this case (or at least put a warning or adjust the value in order to be `Integer.MAX_VALUE`).


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246135818
  
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.tests.integration.jms.client;
    +
    +import org.apache.activemq.artemis.api.core.RoutingType;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
    +import org.apache.activemq.artemis.tests.util.JMSTestBase;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import javax.jms.Connection;
    +import javax.jms.ConnectionFactory;
    +import javax.jms.Destination;
    +import javax.jms.MessageConsumer;
    +import javax.jms.MessageProducer;
    +import javax.jms.Queue;
    +import javax.jms.Session;
    +import javax.jms.TextMessage;
    +import javax.jms.Topic;
    +
    +/**
    + * Exclusive Test
    + */
    +public class ConsumerPriorityTest extends JMSTestBase {
    +
    +   private SimpleString queueName = SimpleString.toSimpleString("jms.consumer.priority.queue");
    --- End diff --
    
    now using getName ... again nice nice....


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r245973668
  
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.tests.integration.amqp;
    +
    +import org.apache.activemq.transport.amqp.client.AmqpClient;
    +import org.apache.activemq.transport.amqp.client.AmqpConnection;
    +import org.apache.activemq.transport.amqp.client.AmqpMessage;
    +import org.apache.activemq.transport.amqp.client.AmqpReceiver;
    +import org.apache.activemq.transport.amqp.client.AmqpSession;
    +import org.apache.qpid.proton.amqp.Symbol;
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Test various behaviors of AMQP receivers with the broker.
    + */
    +public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
    +
    +   @Test(timeout = 30000)
    +   public void testPriority() throws Exception {
    +
    +      AmqpClient client = createAmqpClient();
    +      AmqpConnection connection = addConnection(client.connect());
    +      AmqpSession session = connection.createSession();
    +
    +      Map<Symbol, Object> properties1 = new HashMap<>();
    +      properties1.put(Symbol.getSymbol("priority"), 50);
    +      AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, false, properties1);
    +      receiver1.flow(100);
    +
    +      Map<Symbol, Object> properties2 = new HashMap<>();
    +      properties2.put(Symbol.getSymbol("priority"), 10);
    +      AmqpReceiver receiver2 = session.createReceiver(getQueueName(), null, false, false, properties2);
    +      receiver2.flow(100);
    +
    +      Map<Symbol, Object> properties3 = new HashMap<>();
    +      properties3.put(Symbol.getSymbol("priority"), 5);
    +      AmqpReceiver receiver3 = session.createReceiver(getQueueName(), null, false, false, properties3);
    +      receiver3.flow(100);
    +
    +      sendMessages(getQueueName(), 5);
    +
    +
    +      for (int i = 0; i < 5; i++) {
    +         AmqpMessage message1 = receiver1.receive(250, TimeUnit.MILLISECONDS);
    +         AmqpMessage message2 = receiver2.receive(250, TimeUnit.MILLISECONDS);
    --- End diff --
    
    Burning 250ms twice per loop seems excessive. There is a receiveNoWait that could be used for initial verification nothing arrived, and/or a small final timed wait could be done outside the loop afterwards. Alternatively, pullImmediate() would avoid unnecessary waiting.


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246377289
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---
    @@ -2409,14 +2410,10 @@ private void deliver() {
           // Either the iterator is empty or the consumer is busy
           int noDelivery = 0;
     
    -      int size = 0;
    -
    -      int endPos = -1;
    -
           int handled = 0;
     
           long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
    -
    +      consumers.reset();
    --- End diff --
    
    if any exception would be thrown before it is ok that `reset` won't be called? 
    If not, better to wrap the whole logic with ìtry..finally` and `reset` 


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246403272
  
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java ---
    @@ -0,0 +1,65 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.tests.integration.openwire.amq;
    +
    +import javax.jms.JMSException;
    +import javax.jms.Message;
    +import javax.jms.MessageConsumer;
    +import javax.jms.MessageProducer;
    +import javax.jms.Session;
    +
    +
    +import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
    +import org.apache.activemq.command.ActiveMQQueue;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +public class QueueConsumerPriorityTest extends BasicOpenWireTest {
    +
    +
    +   @Override
    +   @Before
    +   public void setUp() throws Exception {
    +      super.setUp();
    +      this.makeSureCoreQueueExist("QUEUE.A");
    +   }
    +   @Test
    +   public void testQueueConsumerPriority() throws JMSException, InterruptedException {
    +      connection.start();
    +      Session consumerLowPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    +      Session consumerHighPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    +      assertNotNull(consumerHighPriority);
    +      Session senderSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    +      String queueName = "QUEUE.A";
    +      ActiveMQQueue low = new ActiveMQQueue(queueName + "?consumer.priority=1");
    +      MessageConsumer lowConsumer = consumerLowPriority.createConsumer(low);
    +
    +      ActiveMQQueue high = new ActiveMQQueue(queueName + "?consumer.priority=2");
    +      MessageConsumer highConsumer = consumerLowPriority.createConsumer(high);
    +
    +      ActiveMQQueue senderQueue = new ActiveMQQueue(queueName);
    +
    +      MessageProducer producer = senderSession.createProducer(senderQueue);
    +
    +      Message msg = senderSession.createTextMessage("test");
    +      for (int i = 0; i < 1000; i++) {
    +         producer.send(msg);
    +         assertNotNull("null on iteration: " + i, highConsumer.receive(1000));
    +      }
    +      assertNull(lowConsumer.receive(2000));
    --- End diff --
    
    I don't personally think this is a case which warrants keeping poor behaviour 'for consistency' when there are various essentially equivalent checks/assertions the test could do which don't require wasting 2 seconds. Having maybe run it once to ensure it worked, I'd change it.
    
    The ActiveMQ 5 test suite is an even better example of a test suite so slow (due to things like this) that folks don't actually want to run it.


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246313269
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +import java.util.Iterator;
    +
    +/**
    + * Provides an Iterator that works over multiple underlying iterators.
    + *
    + * @param <T> type of the class of the iterator.
    + */
    +public class MultiIterator<T> implements Iterator<T> {
    +
    +   private final Iterator<T>[] iterators;
    +   int index = -1;
    --- End diff --
    
    `private` given that seems that isn't needed by a child


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246574781
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java ---
    @@ -0,0 +1,319 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.function.Consumer;
    +import java.util.function.Supplier;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the the different collections used for each priority level.
    + *
    + * A supplier is required to provide the underlying collection needed when a new priority level is seen,
    + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics,
    + * if list, then list semantics.
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware.
    + */
    +public class PriorityCollection<T extends PriorityAware> extends AbstractCollection<T> {
    +
    +   private final Supplier<Collection<T>> supplier;
    +   private volatile PriorityHolder<T>[] priorityHolders = newPrioritySetArrayInstance(0);
    +   private volatile int size;
    +
    +   private void setArray(PriorityHolder<T>[] priorityHolders) {
    +      this.priorityHolders = priorityHolders;
    +   }
    +
    +   private PriorityHolder<T>[] getArray() {
    +      return priorityHolders;
    +   }
    +
    +
    +   public PriorityCollection(Supplier<Collection<T>> supplier) {
    +      this.supplier = supplier;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> PriorityHolder<T>[] newPrioritySetArrayInstance(int length) {
    +      return (PriorityHolder<T>[]) Array.newInstance(PriorityHolder.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   public Set<Integer> getPriorites() {
    +      PriorityHolder<T>[] snapshot = getArray();
    +      return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      Iterator<T>[] iterators = getIterators();
    +      return new MultiIterator<>(iterators);
    +   }
    +
    +   private Iterator<T>[] getIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      Iterator<T>[] iterators = newIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = snapshot[i].getValues().iterator();
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Iterator<T>[] newIteratorArrayInstance(int length) {
    +      return (Iterator<T>[]) Array.newInstance(Iterator.class, length);
    +   }
    +
    +   public ResettableIterator<T> resettableIterator() {
    +      return new MultiResettableIterator<T>(getResettableIterators());
    +   }
    +
    +   private ResettableIterator<T>[] getResettableIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      ResettableIterator<T>[] iterators = newResettableIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = new ArrayResettableIterator<>(snapshot[i].getValues().toArray());
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> ResettableIterator<T>[] newResettableIteratorArrayInstance(int length) {
    +      return (ResettableIterator<T>[]) Array.newInstance(ResettableIterator.class, length);
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      PriorityHolder<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].getValues().forEach(action);
    +      }
    +   }
    +
    +   private Collection<T> getCollection(int priority, boolean createIfMissing) {
    +      PriorityHolder<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         PriorityHolder<T> midVal = current[mid];
    +
    +         if (midVal.getPriority() > priority)
    +            low = mid + 1;
    +         else if (midVal.getPriority() < priority)
    +            high = mid - 1;
    +         else
    +            return midVal.getValues(); //key found
    +      }
    +
    +      if (createIfMissing) {
    +         PriorityHolder<T>[] newArray = newPrioritySetArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newArray, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newArray, low + 1, current.length - low);
    +         }
    +         newArray[low] = new PriorityHolder<T>(priority, supplier);
    +         setArray(newArray);
    +         return newArray[low].getValues();
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    +      boolean result = addInternal(t);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean addInternal(T t) {
    +      if (t == null) return false;
    +      Collection<T> priority = getCollection(t.getPriority(), true);
    +      return priority.add(t);
    +   }
    +
    +   @Override
    +   public boolean remove(Object o) {
    +      return o instanceof PriorityAware && remove((PriorityAware) o);
    +   }
    +
    +   public synchronized boolean remove(PriorityAware priorityAware) {
    +      boolean result = removeInternal(priorityAware);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean removeInternal(PriorityAware priorityAware) {
    +      if ( priorityAware == null) return false;
    +      Collection<T> priority = getCollection(priorityAware.getPriority(), false);
    +      boolean result = priority != null && priority.remove(priorityAware);
    +      if (priority != null && priority.size() == 0) {
    +         removeCollection(priorityAware.getPriority());
    +      }
    +      return result;
    +   }
    +
    +   private Collection<T> removeCollection(int priority) {
    +      PriorityHolder<T>[] current = getArray();
    +      int len = current.length;
    +      int low = 0;
    +      int high = len - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    --- End diff --
    
    still just a coding preference, i see you even do bitwise alot when can be just written in standard math ;) ..... 
    
    anyhow have changed to avoid disagreement.


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246377787
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---
    @@ -2497,45 +2484,36 @@ private void deliver() {
     
     
                       handled++;
    -
    +                  consumers.reset();
                       continue;
                    }
     
                    if (logger.isTraceEnabled()) {
                       logger.trace("Queue " + this.getName() + " is delivering reference " + ref);
                    }
     
    -               // If a group id is set, then this overrides the consumer chosen round-robin
    -
    -               SimpleString groupID = extractGroupID(ref);
    -
    -               if (groupID != null) {
    -                  groupConsumer = groups.get(groupID);
    +               final SimpleString groupID = extractGroupID(ref);
    +               groupConsumer = getGroupConsumer(groupConsumer, groupID);
     
    -                  if (groupConsumer != null) {
    -                     consumer = groupConsumer;
    -                  }
    -               }
    -
    -               if (exclusive && redistributor == null) {
    -                  consumer = consumerList.get(0).consumer;
    +               if (groupConsumer != null) {
    +                  consumer = groupConsumer;
                    }
     
                    HandleStatus status = handle(ref, consumer);
     
                    if (status == HandleStatus.HANDLED) {
     
    -                  deliveriesInTransit.countUp();
    -
    -                  handledconsumer = consumer;
    -
    -                  removeMessageReference(holder, ref);
    -
                       if (redistributor == null) {
                          handleMessageGroup(ref, consumer, groupConsumer, groupID);
                       }
     
    +                  deliveriesInTransit.countUp();
    +
    +
    +                  removeMessageReference(holder, ref);
    +                  handledconsumer = consumer;
                       handled++;
    +                  consumers.reset();
    --- End diff --
    
    if `removeMessageReference` would throw any exeption is fine to have `consumers.reset` not called?
    if not, uses `try...finally`


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520121
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java ---
    @@ -0,0 +1,319 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.function.Consumer;
    +import java.util.function.Supplier;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the the different collections used for each priority level.
    + *
    + * A supplier is required to provide the underlying collection needed when a new priority level is seen,
    + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics,
    + * if list, then list semantics.
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware.
    + */
    +public class PriorityCollection<T extends PriorityAware> extends AbstractCollection<T> {
    +
    +   private final Supplier<Collection<T>> supplier;
    +   private volatile PriorityHolder<T>[] priorityHolders = newPrioritySetArrayInstance(0);
    +   private volatile int size;
    +
    +   private void setArray(PriorityHolder<T>[] priorityHolders) {
    +      this.priorityHolders = priorityHolders;
    +   }
    +
    +   private PriorityHolder<T>[] getArray() {
    +      return priorityHolders;
    +   }
    +
    +
    +   public PriorityCollection(Supplier<Collection<T>> supplier) {
    +      this.supplier = supplier;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> PriorityHolder<T>[] newPrioritySetArrayInstance(int length) {
    +      return (PriorityHolder<T>[]) Array.newInstance(PriorityHolder.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   public Set<Integer> getPriorites() {
    +      PriorityHolder<T>[] snapshot = getArray();
    +      return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      Iterator<T>[] iterators = getIterators();
    +      return new MultiIterator<>(iterators);
    +   }
    +
    +   private Iterator<T>[] getIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      Iterator<T>[] iterators = newIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = snapshot[i].getValues().iterator();
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Iterator<T>[] newIteratorArrayInstance(int length) {
    +      return (Iterator<T>[]) Array.newInstance(Iterator.class, length);
    +   }
    +
    +   public ResettableIterator<T> resettableIterator() {
    +      return new MultiResettableIterator<T>(getResettableIterators());
    +   }
    +
    +   private ResettableIterator<T>[] getResettableIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      ResettableIterator<T>[] iterators = newResettableIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = new ArrayResettableIterator<>(snapshot[i].getValues().toArray());
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> ResettableIterator<T>[] newResettableIteratorArrayInstance(int length) {
    +      return (ResettableIterator<T>[]) Array.newInstance(ResettableIterator.class, length);
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      PriorityHolder<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].getValues().forEach(action);
    +      }
    +   }
    +
    +   private Collection<T> getCollection(int priority, boolean createIfMissing) {
    +      PriorityHolder<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         PriorityHolder<T> midVal = current[mid];
    +
    +         if (midVal.getPriority() > priority)
    +            low = mid + 1;
    +         else if (midVal.getPriority() < priority)
    +            high = mid - 1;
    +         else
    +            return midVal.getValues(); //key found
    +      }
    +
    +      if (createIfMissing) {
    +         PriorityHolder<T>[] newArray = newPrioritySetArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newArray, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newArray, low + 1, current.length - low);
    +         }
    +         newArray[low] = new PriorityHolder<T>(priority, supplier);
    +         setArray(newArray);
    +         return newArray[low].getValues();
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    +      boolean result = addInternal(t);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean addInternal(T t) {
    +      if (t == null) return false;
    +      Collection<T> priority = getCollection(t.getPriority(), true);
    +      return priority.add(t);
    +   }
    +
    +   @Override
    +   public boolean remove(Object o) {
    +      return o instanceof PriorityAware && remove((PriorityAware) o);
    +   }
    +
    +   public synchronized boolean remove(PriorityAware priorityAware) {
    +      boolean result = removeInternal(priorityAware);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean removeInternal(PriorityAware priorityAware) {
    +      if ( priorityAware == null) return false;
    +      Collection<T> priority = getCollection(priorityAware.getPriority(), false);
    +      boolean result = priority != null && priority.remove(priorityAware);
    +      if (priority != null && priority.size() == 0) {
    +         removeCollection(priorityAware.getPriority());
    +      }
    +      return result;
    +   }
    +
    +   private Collection<T> removeCollection(int priority) {
    +      PriorityHolder<T>[] current = getArray();
    +      int len = current.length;
    +      int low = 0;
    +      int high = len - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         PriorityHolder<T> midVal = current[mid];
    +
    +         if (midVal.getPriority() > priority)
    +            low = mid + 1;
    +         else if (midVal.getPriority() < priority)
    +            high = mid - 1;
    +         else {
    +            PriorityHolder<T>[] newArray = newPrioritySetArrayInstance(len - 1);
    +            System.arraycopy(current, 0, newArray, 0, mid);
    +            System.arraycopy(current, mid + 1, newArray, mid, len - mid - 1);
    +            setArray(newArray);
    +            return midVal.getValues(); //key found
    +         }
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public boolean containsAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      for (Object e : c)
    +         if (!contains(e))
    +            return false;
    +      return true;
    +   }
    +
    +   @Override
    +   public synchronized boolean addAll(Collection<? extends T> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      for (T e : c)
    +         if (addInternal(e))
    +            modified = true;
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized boolean removeAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      for (Object o : c) {
    +         if (remove(o)) {
    +            modified = true;
    +         }
    +      }
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized boolean retainAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      PriorityHolder<T>[] snapshot = getArray();
    +      for (PriorityHolder<T> priorityHolder : snapshot) {
    +         if (priorityHolder.getValues().retainAll(c)) {
    +            modified = true;
    +         }
    +      }
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized void clear() {
    +      PriorityHolder<T>[] snapshot = getArray();
    +      for (PriorityHolder<T> priorityHolder : snapshot) {
    +         priorityHolder.getValues().clear();
    +      }
    +      calcSize();
    +   }
    +
    +   @Override
    +   public boolean contains(Object o) {
    +      return o instanceof PriorityAware && contains((PriorityAware) o);
    +   }
    +
    +   public boolean contains(PriorityAware priorityAware) {
    +      if (priorityAware == null) return false;
    +      Collection<T> prioritySet = getCollection(priorityAware.getPriority(), false);
    +      return prioritySet != null && prioritySet.contains(priorityAware);
    +   }
    +
    +   private void calcSize() {
    +      PriorityHolder<T>[] current = getArray();
    +      int size = 0;
    +      for (PriorityHolder<T> priorityHolder : current) {
    +         size += priorityHolder.getValues().size();
    --- End diff --
    
    If we have a case of the number of active consumers on a single queue is integer max value we will have many areas to address tbh! And that is some super extreme case.
    
    We can reject on adding a new consumer if adding the new consumer would breach int max value, if we are really worried


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r245973707
  
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.tests.integration.amqp;
    +
    +import org.apache.activemq.transport.amqp.client.AmqpClient;
    +import org.apache.activemq.transport.amqp.client.AmqpConnection;
    +import org.apache.activemq.transport.amqp.client.AmqpMessage;
    +import org.apache.activemq.transport.amqp.client.AmqpReceiver;
    +import org.apache.activemq.transport.amqp.client.AmqpSession;
    +import org.apache.qpid.proton.amqp.Symbol;
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Test various behaviors of AMQP receivers with the broker.
    + */
    +public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
    +
    +   @Test(timeout = 30000)
    +   public void testPriority() throws Exception {
    +
    +      AmqpClient client = createAmqpClient();
    +      AmqpConnection connection = addConnection(client.connect());
    +      AmqpSession session = connection.createSession();
    +
    +      Map<Symbol, Object> properties1 = new HashMap<>();
    +      properties1.put(Symbol.getSymbol("priority"), 50);
    +      AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, false, properties1);
    +      receiver1.flow(100);
    +
    +      Map<Symbol, Object> properties2 = new HashMap<>();
    +      properties2.put(Symbol.getSymbol("priority"), 10);
    +      AmqpReceiver receiver2 = session.createReceiver(getQueueName(), null, false, false, properties2);
    +      receiver2.flow(100);
    +
    +      Map<Symbol, Object> properties3 = new HashMap<>();
    +      properties3.put(Symbol.getSymbol("priority"), 5);
    +      AmqpReceiver receiver3 = session.createReceiver(getQueueName(), null, false, false, properties3);
    +      receiver3.flow(100);
    +
    +      sendMessages(getQueueName(), 5);
    +
    +
    +      for (int i = 0; i < 5; i++) {
    +         AmqpMessage message1 = receiver1.receive(250, TimeUnit.MILLISECONDS);
    +         AmqpMessage message2 = receiver2.receive(250, TimeUnit.MILLISECONDS);
    +         AmqpMessage message3 = receiver3.receive(250, TimeUnit.MILLISECONDS);
    +         assertNotNull("did not receive message first time", message1);
    +         assertEquals("MessageID:" + i, message1.getMessageId());
    +         message1.accept();
    +         assertNull("message is not meant to goto lower priority receiver", message2);
    +         assertNull("message is not meant to goto lower priority receiver", message3);
    +      }
    +
    +      //Close the high priority receiver
    +      receiver1.close();
    +
    +      sendMessages(getQueueName(), 5);
    +
    +      //Check messages now goto next priority receiver
    +      for (int i = 0; i < 5; i++) {
    +         AmqpMessage message2 = receiver2.receive(250, TimeUnit.MILLISECONDS);
    +         AmqpMessage message3 = receiver3.receive(250, TimeUnit.MILLISECONDS);
    --- End diff --
    
    As above.


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246140306
  
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.tests.integration.amqp;
    +
    +import org.apache.activemq.transport.amqp.client.AmqpClient;
    +import org.apache.activemq.transport.amqp.client.AmqpConnection;
    +import org.apache.activemq.transport.amqp.client.AmqpMessage;
    +import org.apache.activemq.transport.amqp.client.AmqpReceiver;
    +import org.apache.activemq.transport.amqp.client.AmqpSession;
    +import org.apache.qpid.proton.amqp.Symbol;
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Test various behaviors of AMQP receivers with the broker.
    + */
    +public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
    +
    +   @Test(timeout = 30000)
    +   public void testPriority() throws Exception {
    +
    +      AmqpClient client = createAmqpClient();
    +      AmqpConnection connection = addConnection(client.connect());
    +      AmqpSession session = connection.createSession();
    +
    +      Map<Symbol, Object> properties1 = new HashMap<>();
    +      properties1.put(Symbol.getSymbol("priority"), 50);
    +      AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, false, properties1);
    +      receiver1.flow(100);
    +
    +      Map<Symbol, Object> properties2 = new HashMap<>();
    +      properties2.put(Symbol.getSymbol("priority"), 10);
    +      AmqpReceiver receiver2 = session.createReceiver(getQueueName(), null, false, false, properties2);
    +      receiver2.flow(100);
    +
    +      Map<Symbol, Object> properties3 = new HashMap<>();
    +      properties3.put(Symbol.getSymbol("priority"), 5);
    +      AmqpReceiver receiver3 = session.createReceiver(getQueueName(), null, false, false, properties3);
    +      receiver3.flow(100);
    +
    +      sendMessages(getQueueName(), 5);
    +
    +
    +      for (int i = 0; i < 5; i++) {
    +         AmqpMessage message1 = receiver1.receive(250, TimeUnit.MILLISECONDS);
    +         AmqpMessage message2 = receiver2.receive(250, TimeUnit.MILLISECONDS);
    +         AmqpMessage message3 = receiver3.receive(250, TimeUnit.MILLISECONDS);
    +         assertNotNull("did not receive message first time", message1);
    +         assertEquals("MessageID:" + i, message1.getMessageId());
    +         message1.accept();
    +         assertNull("message is not meant to goto lower priority receiver", message2);
    +         assertNull("message is not meant to goto lower priority receiver", message3);
    +      }
    +
    +      //Close the high priority receiver
    +      receiver1.close();
    +
    +      sendMessages(getQueueName(), 5);
    +
    +      //Check messages now goto next priority receiver
    +      for (int i = 0; i < 5; i++) {
    +         AmqpMessage message2 = receiver2.receive(250, TimeUnit.MILLISECONDS);
    +         AmqpMessage message3 = receiver3.receive(250, TimeUnit.MILLISECONDS);
    --- End diff --
    
    changed to receiveNoWait()


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520970
  
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java ---
    @@ -0,0 +1,44 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +/**
    + * Extends MultiIterator, adding the ability if the underlying iterators are resettable, then its self can reset.
    + * It achieves this by going back to the first iterator, and as moves to another iterator it resets it.
    + *
    + * @param <T> type of the class of the iterator.
    + */
    +public class MultiResettableIterator<T> extends MultiIterator<T> implements ResettableIterator<T> {
    --- End diff --
    
    You could but this then makes using MutiIterator ugly as hell with little gain


---

[GitHub] activemq-artemis pull request #2490: V2 196

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r245955337
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java ---
    @@ -104,6 +119,11 @@ public void decodeRest(final ActiveMQBuffer buffer) {
           filterString = buffer.readNullableSimpleString();
           browseOnly = buffer.readBoolean();
           requiresResponse = buffer.readBoolean();
    +      if (buffer.readableBytes() > 0) {
    --- End diff --
    
    I assume this is to allow for old clients that don't send this value. Would a more specific version check be clearer here for later reference? Related, I'm guessing other changes already made for 2.7.0 have updated the version info since it doesn't look to change here?
    
    Also, is the reverse case safe, does an older server failing to read the additional value (seemingly always sent now) have potential to lead to any issues on older servers, i.e how might the buffer continue to be used later if at all? Should the client omit the value for older servers? (Or does the presumed version change prevent the new client working with the old server anyway? I don't know how that stuff is handled, just commenting from reading the diff here).


---