You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/04/03 15:01:44 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1772 Reduce memory footprint and allocations of QueueImpl

Repository: activemq-artemis
Updated Branches:
  refs/heads/master c17f05de2 -> 650c79ee0


ARTEMIS-1772 Reduce memory footprint and allocations of QueueImpl

It includes:
- Message References: no longer uses boxed primitives and AtomicInteger
- Node: intrusive nodes no longer need a reference field holding itself
- RefCountMessage: no longer uses AtomicInteger, but AtomicIntegerFieldUpdater


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f6e8345d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f6e8345d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f6e8345d

Branch: refs/heads/master
Commit: f6e8345dbec7a809b1efd782b22d5e8f735964e6
Parents: c17f05d
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Mar 20 10:16:24 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Apr 3 11:01:38 2018 -0400

----------------------------------------------------------------------
 .../utils/collections/LinkedListImpl.java       | 40 ++++++----
 .../artemis/api/core/RefCountMessage.java       | 19 +++--
 .../protocol/openwire/OpenWireConnection.java   |  6 +-
 .../core/paging/cursor/PagedReferenceImpl.java  | 78 +++++++++++++-------
 .../artemis/core/server/MessageReference.java   |  8 +-
 .../core/server/impl/LastValueQueue.java        | 32 +++++---
 .../core/server/impl/MessageReferenceImpl.java  | 42 ++++++++---
 .../artemis/core/server/impl/RefsOperation.java |  4 +-
 .../impl/LoggingActiveMQServerPlugin.java       |  2 +-
 9 files changed, 149 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f6e8345d/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
index 6071324..cb20258 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
@@ -30,7 +30,7 @@ public class LinkedListImpl<E> implements LinkedList<E> {
 
    private static final int INITIAL_ITERATOR_ARRAY_SIZE = 10;
 
-   private final Node<E> head = new Node<>(null);
+   private final Node<E> head = new NodeHolder<>(null);
 
    private Node<E> tail = null;
 
@@ -91,7 +91,7 @@ public class LinkedListImpl<E> implements LinkedList<E> {
       if (ret != null) {
          removeAfter(head);
 
-         return ret.val;
+         return ret.val();
       } else {
          return null;
       }
@@ -218,29 +218,37 @@ public class LinkedListImpl<E> implements LinkedList<E> {
       throw new IllegalStateException("Cannot find iter to remove");
    }
 
+   private static final class NodeHolder<T> extends Node<T> {
+
+      private final T val;
+
+      //only the head is allowed to hold a null
+      private NodeHolder(T e) {
+         val = e;
+      }
+
+      @Override
+      protected T val() {
+         return val;
+      }
+   }
+
    public static class Node<T> {
 
       private Node<T> next;
 
       private Node<T> prev;
 
-      private final T val;
-
       private int iterCount;
 
       @SuppressWarnings("unchecked")
-      protected Node() {
-         val = (T)this;
-      }
-
-      //only the head is allowed to hold a null
-      private Node(T e) {
-         val = e;
+      protected T val() {
+         return (T) this;
       }
 
       @Override
       public String toString() {
-         return val == this ? "Intrusive Node" : "Node, value = " + val;
+         return val() == this ? "Intrusive Node" : "Node, value = " + val();
       }
 
       private static <T> Node<T> with(final T o) {
@@ -254,7 +262,7 @@ public class LinkedListImpl<E> implements LinkedList<E> {
                return node;
             }
          }
-         return new Node(o);
+         return new NodeHolder<>(o);
       }
    }
 
@@ -298,14 +306,14 @@ public class LinkedListImpl<E> implements LinkedList<E> {
             repeat = false;
 
             if (e != null) {
-               return e.val;
+               return e.val();
             } else {
                if (canAdvance()) {
                   advance();
 
                   e = getNode();
 
-                  return e.val;
+                  return e.val();
                } else {
                   throw new NoSuchElementException();
                }
@@ -326,7 +334,7 @@ public class LinkedListImpl<E> implements LinkedList<E> {
 
          repeat = false;
 
-         return e.val;
+         return e.val();
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f6e8345d/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java
index 64dd44d..5754bcc 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java
@@ -17,13 +17,16 @@
 
 package org.apache.activemq.artemis.api.core;
 
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 public abstract class RefCountMessage implements Message {
 
-   private final AtomicInteger durableRefCount = new AtomicInteger();
+   private static final AtomicIntegerFieldUpdater<RefCountMessage> DURABLE_REF_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(RefCountMessage.class, "durableRefCount");
+   private static final AtomicIntegerFieldUpdater<RefCountMessage> REF_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(RefCountMessage.class, "refCount");
 
-   private final AtomicInteger refCount = new AtomicInteger();
+   private volatile int durableRefCount = 0;
+
+   private volatile int refCount = 0;
 
    private RefCountMessageListener context;
 
@@ -40,12 +43,12 @@ public abstract class RefCountMessage implements Message {
 
    @Override
    public int getRefCount() {
-      return refCount.get();
+      return refCount;
    }
 
    @Override
    public int incrementRefCount() throws Exception {
-      int count = refCount.incrementAndGet();
+      int count = REF_COUNT_UPDATER.incrementAndGet(this);
       if (context != null) {
          context.nonDurableUp(this, count);
       }
@@ -54,7 +57,7 @@ public abstract class RefCountMessage implements Message {
 
    @Override
    public int incrementDurableRefCount() {
-      int count = durableRefCount.incrementAndGet();
+      int count = DURABLE_REF_COUNT_UPDATER.incrementAndGet(this);
       if (context != null) {
          context.durableUp(this, count);
       }
@@ -63,7 +66,7 @@ public abstract class RefCountMessage implements Message {
 
    @Override
    public int decrementDurableRefCount() {
-      int count = durableRefCount.decrementAndGet();
+      int count = DURABLE_REF_COUNT_UPDATER.decrementAndGet(this);
       if (context != null) {
          context.durableDown(this, count);
       }
@@ -72,7 +75,7 @@ public abstract class RefCountMessage implements Message {
 
    @Override
    public int decrementRefCount() throws Exception {
-      int count = refCount.decrementAndGet();
+      int count = REF_COUNT_UPDATER.decrementAndGet(this);
       if (context != null) {
          context.nonDurableDown(this, count);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f6e8345d/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 499fb4b..f671671 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1269,11 +1269,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
             for (ListIterator<MessageReference> referenceIterator = ackRefs.listIterator(ackRefs.size()); referenceIterator.hasPrevious(); ) {
                MessageReference ref = referenceIterator.previous();
 
-               Long consumerID = ref.getConsumerId();
-
                ServerConsumer consumer = null;
-               if (consumerID != null) {
-                  consumer = session.getCoreSession().locateConsumer(consumerID);
+               if (ref.hasConsumerId()) {
+                  consumer = session.getCoreSession().locateConsumer(ref.getConsumerId());
                }
 
                if (consumer != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f6e8345d/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 23f01f9..9a37bd8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -17,7 +17,7 @@
 package org.apache.activemq.artemis.core.paging.cursor;
 
 import java.lang.ref.WeakReference;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
@@ -33,19 +33,26 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
 
    private static final Logger logger = Logger.getLogger(PagedReferenceImpl.class);
 
+   private static final AtomicIntegerFieldUpdater<PagedReferenceImpl> DELIVERY_COUNT_UPDATER = AtomicIntegerFieldUpdater
+      .newUpdater(PagedReferenceImpl.class, "deliveryCount");
+
    private final PagePosition position;
 
    private WeakReference<PagedMessage> message;
 
-   private Long deliveryTime = null;
+   private static final long UNDEFINED_DELIVERY_TIME = Long.MIN_VALUE;
+   private long deliveryTime = UNDEFINED_DELIVERY_TIME;
 
    private int persistedCount;
 
    private int messageEstimate = -1;
 
-   private Long consumerId;
+   private long consumerID;
+
+   private boolean hasConsumerID = false;
 
-   private final AtomicInteger deliveryCount = new AtomicInteger(0);
+   @SuppressWarnings("unused")
+   private volatile int deliveryCount = 0;
 
    private final PageSubscription subscription;
 
@@ -53,7 +60,11 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
 
    private Object protocolData;
 
-   private Boolean largeMessage;
+   //0 is false, 1 is true, 2 not defined
+   private static final byte IS_NOT_LARGE_MESSAGE = 0;
+   private static final byte IS_LARGE_MESSAGE = 1;
+   private static final byte UNDEFINED_IS_LARGE_MESSAGE = 2;
+   private byte largeMessage;
 
    private long transactionID = -1;
 
@@ -104,14 +115,14 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
       this.message = new WeakReference<>(message);
       this.subscription = subscription;
       if (message != null) {
-         this.largeMessage = message.getMessage().isLargeMessage();
+         this.largeMessage = message.getMessage().isLargeMessage() ? IS_LARGE_MESSAGE : IS_NOT_LARGE_MESSAGE;
          this.transactionID = message.getTransactionID();
          this.messageID = message.getMessage().getMessageID();
 
          //pre-cache the message size so we don't have to reload the message later if it is GC'd
          getPersistentSize();
       } else {
-         this.largeMessage = null;
+         this.largeMessage = UNDEFINED_IS_LARGE_MESSAGE;
          this.transactionID = -1;
          this.messageID = -1;
          this.messageSize = -1;
@@ -152,7 +163,7 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
 
    @Override
    public long getScheduledDeliveryTime() {
-      if (deliveryTime == null) {
+      if (deliveryTime == UNDEFINED_DELIVERY_TIME) {
          try {
             Message msg = getMessage();
             return msg.getScheduledDeliveryTime();
@@ -166,31 +177,31 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
 
    @Override
    public void setScheduledDeliveryTime(final long scheduledDeliveryTime) {
+      assert scheduledDeliveryTime != UNDEFINED_DELIVERY_TIME : "can't use a reserved value";
       deliveryTime = scheduledDeliveryTime;
    }
 
    @Override
    public int getDeliveryCount() {
-      return deliveryCount.get();
+      return DELIVERY_COUNT_UPDATER.get(this);
    }
 
    @Override
    public void setDeliveryCount(final int deliveryCount) {
-      this.deliveryCount.set(deliveryCount);
+      DELIVERY_COUNT_UPDATER.set(this, deliveryCount);
    }
 
    @Override
    public void incrementDeliveryCount() {
-      deliveryCount.incrementAndGet();
+      DELIVERY_COUNT_UPDATER.incrementAndGet(this);
       if (logger.isTraceEnabled()) {
          logger.trace("++deliveryCount = " + deliveryCount + " for " + this, new Exception("trace"));
       }
-
    }
 
    @Override
    public void decrementDeliveryCount() {
-      deliveryCount.decrementAndGet();
+      DELIVERY_COUNT_UPDATER.decrementAndGet(this);
       if (logger.isTraceEnabled()) {
          logger.trace("--deliveryCount = " + deliveryCount + " for " + this, new Exception("trace"));
       }
@@ -251,7 +262,7 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
          ", message=" +
          msgToString +
          ", deliveryTime=" +
-         deliveryTime +
+         (deliveryTime == UNDEFINED_DELIVERY_TIME ? null : deliveryTime) +
          ", persistedCount=" +
          persistedCount +
          ", deliveryCount=" +
@@ -261,28 +272,41 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
          "]";
    }
 
-   /* (non-Javadoc)
-    * @see org.apache.activemq.artemis.core.server.MessageReference#setConsumerId(java.lang.Long)
-    */
    @Override
-   public void setConsumerId(Long consumerID) {
-      this.consumerId = consumerID;
+   public void emptyConsumerID() {
+      this.hasConsumerID = false;
+   }
+
+   @Override
+   public void setConsumerId(long consumerID) {
+      this.hasConsumerID = true;
+      this.consumerID = consumerID;
    }
 
-   /* (non-Javadoc)
-    * @see org.apache.activemq.artemis.core.server.MessageReference#getConsumerId()
-    */
    @Override
-   public Long getConsumerId() {
-      return this.consumerId;
+   public boolean hasConsumerId() {
+      return hasConsumerID;
+   }
+
+   @Override
+   public long getConsumerId() {
+      if (!this.hasConsumerID) {
+         throw new IllegalStateException("consumerID isn't specified: please check hasConsumerId first");
+      }
+      return this.consumerID;
    }
 
    @Override
    public boolean isLargeMessage() {
-      if (largeMessage == null && message != null) {
-         largeMessage = getMessage().isLargeMessage();
+      if (largeMessage == UNDEFINED_IS_LARGE_MESSAGE && message != null) {
+         initializeIsLargeMessage();
       }
-      return largeMessage;
+      return largeMessage == IS_LARGE_MESSAGE;
+   }
+
+   private void initializeIsLargeMessage() {
+      assert largeMessage == UNDEFINED_IS_LARGE_MESSAGE && message != null;
+      largeMessage = getMessage().isLargeMessage() ? IS_LARGE_MESSAGE : IS_NOT_LARGE_MESSAGE;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f6e8345d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index d9145b1..0db84c5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -91,9 +91,13 @@ public interface MessageReference {
 
    void acknowledge(Transaction tx, AckReason reason) throws Exception;
 
-   void setConsumerId(Long consumerID);
+   void emptyConsumerID();
 
-   Long getConsumerId();
+   void setConsumerId(long consumerID);
+
+   boolean hasConsumerId();
+
+   long getConsumerId();
 
    void handled();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f6e8345d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 2620cf9..e3097d1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -175,7 +175,9 @@ public class LastValueQueue extends QueueImpl {
 
       private volatile MessageReference ref;
 
-      private Long consumerId;
+      private long consumerID;
+
+      private boolean hasConsumerID = false;
 
       HolderReference(final SimpleString prop, final MessageReference ref) {
          this.prop = prop;
@@ -309,20 +311,28 @@ public class LastValueQueue extends QueueImpl {
          return ref.getMessage().getMemoryEstimate();
       }
 
-      /* (non-Javadoc)
-       * @see org.apache.activemq.artemis.core.server.MessageReference#setConsumerId(java.lang.Long)
-       */
       @Override
-      public void setConsumerId(Long consumerID) {
-         this.consumerId = consumerID;
+      public void emptyConsumerID() {
+         this.hasConsumerID = false;
+      }
+
+      @Override
+      public void setConsumerId(long consumerID) {
+         this.hasConsumerID = true;
+         this.consumerID = consumerID;
+      }
+
+      @Override
+      public boolean hasConsumerId() {
+         return hasConsumerID;
       }
 
-      /* (non-Javadoc)
-       * @see org.apache.activemq.artemis.core.server.MessageReference#getConsumerId()
-       */
       @Override
-      public Long getConsumerId() {
-         return this.consumerId;
+      public long getConsumerId() {
+         if (!this.hasConsumerID) {
+            throw new IllegalStateException("consumerID isn't specified: please check hasConsumerId first");
+         }
+         return this.consumerID;
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f6e8345d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 2802740..4d077ae 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -30,7 +30,11 @@ import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
  */
 public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference {
 
-   private final AtomicInteger deliveryCount = new AtomicInteger();
+   private static final AtomicIntegerFieldUpdater<MessageReferenceImpl> DELIVERY_COUNT_UPDATER = AtomicIntegerFieldUpdater
+      .newUpdater(MessageReferenceImpl.class, "deliveryCount");
+
+   @SuppressWarnings("unused")
+   private volatile int deliveryCount = 0;
 
    private volatile int persistedCount;
 
@@ -40,7 +44,9 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
 
    private final Queue queue;
 
-   private Long consumerID;
+   private long consumerID;
+
+   private boolean hasConsumerID = false;
 
    private boolean alreadyAcked;
 
@@ -59,7 +65,7 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
    }
 
    public MessageReferenceImpl(final MessageReferenceImpl other, final Queue queue) {
-      deliveryCount.set(other.deliveryCount.get());
+      DELIVERY_COUNT_UPDATER.set(this, other.getDeliveryCount());
 
       scheduledDeliveryTime = other.scheduledDeliveryTime;
 
@@ -113,23 +119,23 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
 
    @Override
    public int getDeliveryCount() {
-      return deliveryCount.get();
+      return DELIVERY_COUNT_UPDATER.get(this);
    }
 
    @Override
    public void setDeliveryCount(final int deliveryCount) {
-      this.deliveryCount.set(deliveryCount);
-      this.persistedCount = this.deliveryCount.get();
+      DELIVERY_COUNT_UPDATER.set(this, deliveryCount);
+      this.persistedCount = deliveryCount;
    }
 
    @Override
    public void incrementDeliveryCount() {
-      deliveryCount.incrementAndGet();
+      DELIVERY_COUNT_UPDATER.incrementAndGet(this);
    }
 
    @Override
    public void decrementDeliveryCount() {
-      deliveryCount.decrementAndGet();
+      DELIVERY_COUNT_UPDATER.decrementAndGet(this);
    }
 
    @Override
@@ -197,12 +203,26 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
    }
 
    @Override
-   public void setConsumerId(Long consumerID) {
+   public void emptyConsumerID() {
+      this.hasConsumerID = false;
+   }
+
+   @Override
+   public void setConsumerId(long consumerID) {
+      this.hasConsumerID = true;
       this.consumerID = consumerID;
    }
 
    @Override
-   public Long getConsumerId() {
+   public boolean hasConsumerId() {
+      return hasConsumerID;
+   }
+
+   @Override
+   public long getConsumerId() {
+      if (!this.hasConsumerID) {
+         throw new IllegalStateException("consumerID isn't specified: please check hasConsumerId first");
+      }
       return this.consumerID;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f6e8345d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index e492985..9af8704 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -81,7 +81,7 @@ public class RefsOperation extends TransactionOperationAbstract {
       List<MessageReference> ackedRefs = new ArrayList<>();
 
       for (MessageReference ref : refsToAck) {
-         ref.setConsumerId(null);
+         ref.emptyConsumerID();
 
          if (logger.isTraceEnabled()) {
             logger.trace("rolling back " + ref);
@@ -189,7 +189,7 @@ public class RefsOperation extends TransactionOperationAbstract {
    public synchronized List<MessageReference> getListOnConsumer(long consumerID) {
       List<MessageReference> list = new LinkedList<>();
       for (MessageReference ref : refsToAck) {
-         if (ref.getConsumerId() != null && ref.getConsumerId().equals(consumerID)) {
+         if (ref.hasConsumerId() && ref.getConsumerId() == consumerID) {
             list.add(ref);
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f6e8345d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
index 70bb6ca..3e66bf1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
@@ -629,7 +629,7 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
 
             // info level logging
             LoggingActiveMQServerPluginLogger.LOGGER.messageAcknowledged((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())),
-                                                                         (ref == null ? UNAVAILABLE : Long.toString(ref.getConsumerId())),
+                                                                         (ref == null ? UNAVAILABLE : ref.hasConsumerId() ? Long.toString(ref.getConsumerId()) : null),
                                                                          (queue == null ? UNAVAILABLE : queue.getName().toString()),
                                                                          reason);
          }


[2/2] activemq-artemis git commit: This closes #1989

Posted by cl...@apache.org.
This closes #1989


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/650c79ee
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/650c79ee
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/650c79ee

Branch: refs/heads/master
Commit: 650c79ee0f31ca317e65e94cfeb4ccf6ef2846ba
Parents: c17f05d f6e8345
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Apr 3 11:01:39 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Apr 3 11:01:39 2018 -0400

----------------------------------------------------------------------
 .../utils/collections/LinkedListImpl.java       | 40 ++++++----
 .../artemis/api/core/RefCountMessage.java       | 19 +++--
 .../protocol/openwire/OpenWireConnection.java   |  6 +-
 .../core/paging/cursor/PagedReferenceImpl.java  | 78 +++++++++++++-------
 .../artemis/core/server/MessageReference.java   |  8 +-
 .../core/server/impl/LastValueQueue.java        | 32 +++++---
 .../core/server/impl/MessageReferenceImpl.java  | 42 ++++++++---
 .../artemis/core/server/impl/RefsOperation.java |  4 +-
 .../impl/LoggingActiveMQServerPlugin.java       |  2 +-
 9 files changed, 149 insertions(+), 82 deletions(-)
----------------------------------------------------------------------