You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by franz1981 <gi...@git.apache.org> on 2019/01/09 09:13:00 UTC
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
GitHub user franz1981 opened a pull request:
https://github.com/apache/activemq-artemis/pull/2494
ARTEMIS-2224 Reduce contention on LivePageCacheImpl
It includes:
- **lock-free LivePageCache + tests**:
LivePageCacheImpl has been reimplemented to be
lock-free, multi-producer and multi-consumer
in any of its operations.
- **Avoid unnecessary page cache queries on ack TX**:
PageSubscriptionImpl::ackTx is already performing a counter update
using the message persistent size: the size can be reused on
PagePosition::setPersistentSize, avoiding to query the page cache just
to compute it.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/franz1981/activemq-artemis lock-free-paging
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/activemq-artemis/pull/2494.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2494
----
----
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246563763
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
@@ -28,15 +28,49 @@
/**
* This is the same as PageCache, however this is for the page that's being currently written.
*/
--- End diff --
@franz1981 Id agree with @clebertsuconic here, a bit like what ive done with priority consumers, i ended up splitting out the collections logic, which has ended up making it cleaner, and easier to reason with. (and as youve marked out on its pr, more testable ;) )
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246570293
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java ---
@@ -864,21 +875,32 @@ private PageCursorInfo processACK(final PagePosition pos) {
return info;
}
+ private void installTXCallback(final Transaction tx, final PagePosition position) {
+ installTXCallback(tx, position, -1);
+ }
+
/**
* @param tx
* @param position
+ * @param persistentSize if negative it needs to be calculated on the fly
*/
- private void installTXCallback(final Transaction tx, final PagePosition position) {
+ private void installTXCallback(final Transaction tx, final PagePosition position, final long persistentSize) {
if (position.getRecordID() >= 0) {
// It needs to persist, otherwise the cursor will return to the fist page position
tx.setContainsPersistent();
}
PageCursorInfo info = getPageInfo(position);
PageCache cache = info.getCache();
- long size = 0;
if (cache != null) {
- size = getPersistentSize(cache.getMessage(position.getMessageNr()));
+ final long size;
+ if (persistentSize < 0) {
--- End diff --
surely this is checking for something like if its -1? not just that its negative which would be worrying.... if so this should be more explicit to just be checking -1, and if anything else thats negative, means illegal argument
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246578304
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
@Override
- public synchronized int getNumberOfMessages() {
- return messages.size();
+ public int getNumberOfMessages() {
+ while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+ Thread.yield();
+ continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
}
@Override
- public synchronized void setMessages(PagedMessage[] messages) {
+ public void setMessages(PagedMessage[] messages) {
// This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
for (PagedMessage msg : messages) {
addLiveMessage(msg);
}
}
@Override
- public synchronized PagedMessage getMessage(int messageNumber) {
- if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
- } else {
+ public PagedMessage getMessage(int messageNumber) {
+ if (messageNumber < 0) {
return null;
}
+ //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
+ long size = lastSeenProducerIndex;
+ if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+ Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
--- End diff --
if this was a collection, this would almost be throwing illegal arguement.....
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568579
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
@Override
- public synchronized int getNumberOfMessages() {
- return messages.size();
+ public int getNumberOfMessages() {
+ while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+ Thread.yield();
+ continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
}
@Override
- public synchronized void setMessages(PagedMessage[] messages) {
+ public void setMessages(PagedMessage[] messages) {
// This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
for (PagedMessage msg : messages) {
addLiveMessage(msg);
}
}
@Override
- public synchronized PagedMessage getMessage(int messageNumber) {
- if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
- } else {
+ public PagedMessage getMessage(int messageNumber) {
+ if (messageNumber < 0) {
return null;
}
+ //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
+ long size = lastSeenProducerIndex;
--- End diff --
naming of fields is missaligned, some places its called size, some others you call it index, this is hard to follow.
---
[GitHub] activemq-artemis issue #2494: ARTEMIS-2224 Reduce contention on LivePageCach...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on the issue:
https://github.com/apache/activemq-artemis/pull/2494
@michaelandrepearce @wy96f @qihongxu Please review and test :+1:
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246569626
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
@Override
- public synchronized int getNumberOfMessages() {
- return messages.size();
+ public int getNumberOfMessages() {
+ while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+ Thread.yield();
+ continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
}
@Override
- public synchronized void setMessages(PagedMessage[] messages) {
+ public void setMessages(PagedMessage[] messages) {
// This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
for (PagedMessage msg : messages) {
addLiveMessage(msg);
}
}
@Override
- public synchronized PagedMessage getMessage(int messageNumber) {
- if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
- } else {
+ public PagedMessage getMessage(int messageNumber) {
+ if (messageNumber < 0) {
return null;
}
+ //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
+ long size = lastSeenProducerIndex;
+ if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+ Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
--- End diff --
I know but in the original version it was handled in that way and it covers 2 case: the collection is null and the collection hasn't enough element
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568552
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
@Override
- public synchronized int getNumberOfMessages() {
- return messages.size();
+ public int getNumberOfMessages() {
+ while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+ Thread.yield();
+ continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
}
@Override
- public synchronized void setMessages(PagedMessage[] messages) {
+ public void setMessages(PagedMessage[] messages) {
// This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
for (PagedMessage msg : messages) {
addLiveMessage(msg);
}
}
@Override
- public synchronized PagedMessage getMessage(int messageNumber) {
- if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
- } else {
+ public PagedMessage getMessage(int messageNumber) {
+ if (messageNumber < 0) {
return null;
}
+ //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
+ long size = lastSeenProducerIndex;
+ if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+ Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+ return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+ }
+ final AtomicChunk<PagedMessage> buffer;
+ final int offset;
+ if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+ } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+ }
+ //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set
+ PagedMessage msg;
+ while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+ }
+ return msg;
+ }
+
+ /**
+ * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries
+ * ie backward search of a node if needed.
+ */
+ private AtomicChunk<PagedMessage> jump(final int messageNumber, final long size) {
+ //fast division by a power of 2
+ final int jumps = messageNumber >> chunkSizeLog2;
+ //size is never allowed to be > Integer.MAX_VALUE
+ final int lastChunkIndex = (int) size >> chunkSizeLog2;
+ int requiredJumps = jumps;
+ AtomicChunk<PagedMessage> jumpBuffer = null;
+ boolean jumpForward = true;
+ int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+ //it's worth to go backward from lastChunkIndex?
+ //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer
+ if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk<PagedMessage> producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+ //we're saving some jumps ie is fine to go backward from here
+ jumpBuffer = producer;
+ requiredJumps = distanceFromLastChunkIndex;
+ jumpForward = false;
+ }
+ }
+ //start from the consumer buffer only is needed
+ if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+ }
+ for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+ }
+ return jumpBuffer;
}
@Override
- public synchronized boolean isLive() {
+ public boolean isLive() {
return isLive;
}
@Override
- public synchronized void addLiveMessage(PagedMessage message) {
+ public void addLiveMessage(PagedMessage message) {
if (message.getMessage().isLargeMessage()) {
((LargeServerMessage) message.getMessage()).incrementDelayDeletionCount();
}
- this.messages.add(message);
+ while (true) {
+ final long pIndex = producerIndex;
+ if (pIndex != RESIZING) {
+ if (pIndex == Integer.MAX_VALUE) {
+ throw new IllegalStateException("can't add more then " + Integer.MAX_VALUE + " messages");
+ }
+ //load acquire the current producer buffer
+ final AtomicChunk<PagedMessage> producerBuffer = this.producerBuffer;
+ final int pOffset = (int) (pIndex & chunkMask);
+ //only the first message to a chunk can attempt to resize
+ if (pOffset == 0) {
+ if (appendChunkAndMessage(producerBuffer, pIndex, message)) {
+ return;
+ }
+ } else if (PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, pIndex + 1)) {
+ //this.producerBuffer is the correct buffer to append a message: it is guarded by the producerIndex logic
+ //NOTE: producerIndex is being updated before setting a new value
+ producerBuffer.lazySet(pOffset, message);
+ return;
+ }
+ }
+ Thread.yield();
+ }
+ }
+
+ private boolean appendChunkAndMessage(AtomicChunk<PagedMessage> producerBuffer, long pIndex, PagedMessage message) {
+ if (!PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, RESIZING)) {
+ return false;
+ }
+ final AtomicChunk<PagedMessage> newChunk;
+ try {
+ final int index = (int) (pIndex >> chunkSizeLog2);
+ newChunk = new AtomicChunk<>(index, producerBuffer, chunkSize);
+ } catch (OutOfMemoryError oom) {
+ //unblock producerIndex without updating it
+ PRODUCER_INDEX_UPDATER.lazySet(this, pIndex);
+ throw oom;
+ }
+ //adding the message to it
+ newChunk.lazySet(0, message);
+ //linking it to the old one, if any
+ if (producerBuffer != null) {
+ //a plain store is enough, given that producerIndex prevents any reader/writer to access it
+ producerBuffer.next = newChunk;
+ } else {
+ //it's first one
+ this.consumerBuffer = newChunk;
+ }
+ //making it the current produced one
+ this.producerBuffer = newChunk;
+ //store release any previous write and "unblock" anyone waiting resizing to finish
+ PRODUCER_INDEX_UPDATER.lazySet(this, pIndex + 1);
+ return true;
}
@Override
- public synchronized void close() {
+ public void close() {
logger.tracef("Closing %s", this);
this.isLive = false;
--- End diff --
No needs,`isLive = false` is a volatile set exactly like using the updater
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246566042
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
@Override
- public synchronized int getNumberOfMessages() {
- return messages.size();
+ public int getNumberOfMessages() {
+ while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+ Thread.yield();
+ continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
}
@Override
- public synchronized void setMessages(PagedMessage[] messages) {
+ public void setMessages(PagedMessage[] messages) {
// This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
for (PagedMessage msg : messages) {
addLiveMessage(msg);
}
}
@Override
- public synchronized PagedMessage getMessage(int messageNumber) {
- if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
- } else {
+ public PagedMessage getMessage(int messageNumber) {
+ if (messageNumber < 0) {
return null;
}
+ //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
+ long size = lastSeenProducerIndex;
+ if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+ Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+ return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+ }
+ final AtomicChunk<PagedMessage> buffer;
+ final int offset;
+ if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+ } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+ }
+ //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set
+ PagedMessage msg;
+ while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+ }
+ return msg;
+ }
+
+ /**
+ * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries
+ * ie backward search of a node if needed.
+ */
+ private AtomicChunk<PagedMessage> jump(final int messageNumber, final long size) {
+ //fast division by a power of 2
+ final int jumps = messageNumber >> chunkSizeLog2;
+ //size is never allowed to be > Integer.MAX_VALUE
+ final int lastChunkIndex = (int) size >> chunkSizeLog2;
+ int requiredJumps = jumps;
+ AtomicChunk<PagedMessage> jumpBuffer = null;
+ boolean jumpForward = true;
+ int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+ //it's worth to go backward from lastChunkIndex?
+ //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer
+ if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk<PagedMessage> producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+ //we're saving some jumps ie is fine to go backward from here
+ jumpBuffer = producer;
+ requiredJumps = distanceFromLastChunkIndex;
+ jumpForward = false;
+ }
+ }
+ //start from the consumer buffer only is needed
+ if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+ }
+ for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+ }
+ return jumpBuffer;
}
@Override
- public synchronized boolean isLive() {
+ public boolean isLive() {
return isLive;
}
@Override
- public synchronized void addLiveMessage(PagedMessage message) {
+ public void addLiveMessage(PagedMessage message) {
if (message.getMessage().isLargeMessage()) {
((LargeServerMessage) message.getMessage()).incrementDelayDeletionCount();
}
- this.messages.add(message);
+ while (true) {
+ final long pIndex = producerIndex;
+ if (pIndex != RESIZING) {
+ if (pIndex == Integer.MAX_VALUE) {
+ throw new IllegalStateException("can't add more then " + Integer.MAX_VALUE + " messages");
+ }
+ //load acquire the current producer buffer
+ final AtomicChunk<PagedMessage> producerBuffer = this.producerBuffer;
+ final int pOffset = (int) (pIndex & chunkMask);
+ //only the first message to a chunk can attempt to resize
+ if (pOffset == 0) {
+ if (appendChunkAndMessage(producerBuffer, pIndex, message)) {
+ return;
+ }
+ } else if (PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, pIndex + 1)) {
+ //this.producerBuffer is the correct buffer to append a message: it is guarded by the producerIndex logic
+ //NOTE: producerIndex is being updated before setting a new value
+ producerBuffer.lazySet(pOffset, message);
+ return;
+ }
+ }
+ Thread.yield();
+ }
+ }
+
+ private boolean appendChunkAndMessage(AtomicChunk<PagedMessage> producerBuffer, long pIndex, PagedMessage message) {
+ if (!PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, RESIZING)) {
+ return false;
+ }
+ final AtomicChunk<PagedMessage> newChunk;
+ try {
+ final int index = (int) (pIndex >> chunkSizeLog2);
+ newChunk = new AtomicChunk<>(index, producerBuffer, chunkSize);
+ } catch (OutOfMemoryError oom) {
+ //unblock producerIndex without updating it
+ PRODUCER_INDEX_UPDATER.lazySet(this, pIndex);
+ throw oom;
+ }
+ //adding the message to it
+ newChunk.lazySet(0, message);
+ //linking it to the old one, if any
+ if (producerBuffer != null) {
+ //a plain store is enough, given that producerIndex prevents any reader/writer to access it
+ producerBuffer.next = newChunk;
+ } else {
+ //it's first one
+ this.consumerBuffer = newChunk;
+ }
+ //making it the current produced one
+ this.producerBuffer = newChunk;
+ //store release any previous write and "unblock" anyone waiting resizing to finish
+ PRODUCER_INDEX_UPDATER.lazySet(this, pIndex + 1);
+ return true;
}
@Override
- public synchronized void close() {
+ public void close() {
logger.tracef("Closing %s", this);
this.isLive = false;
--- End diff --
now isLive is volatile and sync is remove, this should be updated by an atomic field updater? no?
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568743
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
@Override
- public synchronized int getNumberOfMessages() {
- return messages.size();
+ public int getNumberOfMessages() {
+ while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+ Thread.yield();
+ continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
}
@Override
- public synchronized void setMessages(PagedMessage[] messages) {
+ public void setMessages(PagedMessage[] messages) {
// This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
for (PagedMessage msg : messages) {
addLiveMessage(msg);
}
}
@Override
- public synchronized PagedMessage getMessage(int messageNumber) {
- if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
- } else {
+ public PagedMessage getMessage(int messageNumber) {
+ if (messageNumber < 0) {
return null;
}
+ //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
+ long size = lastSeenProducerIndex;
+ if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+ Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
--- End diff --
surely if this occurs, theres some issue......
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246599701
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
@Override
- public synchronized int getNumberOfMessages() {
- return messages.size();
+ public int getNumberOfMessages() {
+ while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+ Thread.yield();
+ continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
}
@Override
- public synchronized void setMessages(PagedMessage[] messages) {
+ public void setMessages(PagedMessage[] messages) {
// This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
for (PagedMessage msg : messages) {
addLiveMessage(msg);
}
}
@Override
- public synchronized PagedMessage getMessage(int messageNumber) {
- if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
- } else {
+ public PagedMessage getMessage(int messageNumber) {
+ if (messageNumber < 0) {
return null;
}
+ //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
+ long size = lastSeenProducerIndex;
+ if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+ Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+ return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+ }
+ final AtomicChunk<PagedMessage> buffer;
+ final int offset;
+ if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+ } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+ }
+ //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set
+ PagedMessage msg;
+ while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+ }
+ return msg;
+ }
+
+ /**
+ * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries
+ * ie backward search of a node if needed.
+ */
+ private AtomicChunk<PagedMessage> jump(final int messageNumber, final long size) {
+ //fast division by a power of 2
+ final int jumps = messageNumber >> chunkSizeLog2;
--- End diff --
eheh you're right :P, but the code of this collection was coming from a version where `chunkSize` was not a static final constant: in the new version is more clear why I have done it
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246558406
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
@@ -28,15 +28,49 @@
/**
* This is the same as PageCache, however this is for the page that's being currently written.
*/
--- End diff --
Isn't this mixing the collection implementation itself into the LivePageCache?
isn't there a way to implement this logic into its own structure? Like PageCache using a generic ChunkArray (a name I just came up here)?
I'm a bit concerned on maintaining the business side of this issue (that is the PageCache) with the speedy implementation of a collection.
---
[GitHub] activemq-artemis issue #2494: ARTEMIS-2224 Reduce contention on LivePageCach...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on the issue:
https://github.com/apache/activemq-artemis/pull/2494
The effects of the contention on `LivePageCacheImpl` are more visible after applying https://github.com/apache/activemq-artemis/pull/2484
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246601217
--- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java ---
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.IntFunction;
+
+/**
+ * This collection is a concurrent append-only list that grows in chunks.<br>
+ * It's safe to be used by many threads concurrently and has a max capacity of {@link Integer#MAX_VALUE}.
+ */
+public final class ConcurrentAppendOnlyChunkedList<T> {
--- End diff --
This is a lot better!
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246567995
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
@Override
- public synchronized int getNumberOfMessages() {
- return messages.size();
+ public int getNumberOfMessages() {
+ while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+ Thread.yield();
+ continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
}
@Override
- public synchronized void setMessages(PagedMessage[] messages) {
+ public void setMessages(PagedMessage[] messages) {
// This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
for (PagedMessage msg : messages) {
addLiveMessage(msg);
}
}
@Override
- public synchronized PagedMessage getMessage(int messageNumber) {
- if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
- } else {
+ public PagedMessage getMessage(int messageNumber) {
+ if (messageNumber < 0) {
return null;
}
+ //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
+ long size = lastSeenProducerIndex;
+ if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+ Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+ return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+ }
+ final AtomicChunk<PagedMessage> buffer;
+ final int offset;
+ if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+ } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+ }
+ //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set
+ PagedMessage msg;
+ while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+ }
+ return msg;
+ }
+
+ /**
+ * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries
+ * ie backward search of a node if needed.
+ */
+ private AtomicChunk<PagedMessage> jump(final int messageNumber, final long size) {
+ //fast division by a power of 2
+ final int jumps = messageNumber >> chunkSizeLog2;
+ //size is never allowed to be > Integer.MAX_VALUE
+ final int lastChunkIndex = (int) size >> chunkSizeLog2;
+ int requiredJumps = jumps;
+ AtomicChunk<PagedMessage> jumpBuffer = null;
+ boolean jumpForward = true;
+ int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+ //it's worth to go backward from lastChunkIndex?
+ //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer
+ if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk<PagedMessage> producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+ //we're saving some jumps ie is fine to go backward from here
+ jumpBuffer = producer;
+ requiredJumps = distanceFromLastChunkIndex;
+ jumpForward = false;
+ }
+ }
+ //start from the consumer buffer only is needed
+ if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+ }
+ for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+ }
+ return jumpBuffer;
}
@Override
- public synchronized boolean isLive() {
+ public boolean isLive() {
return isLive;
}
@Override
- public synchronized void addLiveMessage(PagedMessage message) {
+ public void addLiveMessage(PagedMessage message) {
if (message.getMessage().isLargeMessage()) {
((LargeServerMessage) message.getMessage()).incrementDelayDeletionCount();
}
- this.messages.add(message);
+ while (true) {
+ final long pIndex = producerIndex;
+ if (pIndex != RESIZING) {
+ if (pIndex == Integer.MAX_VALUE) {
+ throw new IllegalStateException("can't add more then " + Integer.MAX_VALUE + " messages");
+ }
+ //load acquire the current producer buffer
+ final AtomicChunk<PagedMessage> producerBuffer = this.producerBuffer;
+ final int pOffset = (int) (pIndex & chunkMask);
+ //only the first message to a chunk can attempt to resize
+ if (pOffset == 0) {
+ if (appendChunkAndMessage(producerBuffer, pIndex, message)) {
+ return;
+ }
+ } else if (PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, pIndex + 1)) {
+ //this.producerBuffer is the correct buffer to append a message: it is guarded by the producerIndex logic
+ //NOTE: producerIndex is being updated before setting a new value
+ producerBuffer.lazySet(pOffset, message);
+ return;
+ }
+ }
+ Thread.yield();
+ }
+ }
+
+ private boolean appendChunkAndMessage(AtomicChunk<PagedMessage> producerBuffer, long pIndex, PagedMessage message) {
+ if (!PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, RESIZING)) {
+ return false;
+ }
+ final AtomicChunk<PagedMessage> newChunk;
+ try {
+ final int index = (int) (pIndex >> chunkSizeLog2);
+ newChunk = new AtomicChunk<>(index, producerBuffer, chunkSize);
+ } catch (OutOfMemoryError oom) {
+ //unblock producerIndex without updating it
+ PRODUCER_INDEX_UPDATER.lazySet(this, pIndex);
+ throw oom;
+ }
+ //adding the message to it
+ newChunk.lazySet(0, message);
+ //linking it to the old one, if any
+ if (producerBuffer != null) {
+ //a plain store is enough, given that producerIndex prevents any reader/writer to access it
+ producerBuffer.next = newChunk;
+ } else {
+ //it's first one
+ this.consumerBuffer = newChunk;
+ }
+ //making it the current produced one
+ this.producerBuffer = newChunk;
+ //store release any previous write and "unblock" anyone waiting resizing to finish
+ PRODUCER_INDEX_UPDATER.lazySet(this, pIndex + 1);
+ return true;
}
@Override
- public synchronized void close() {
+ public void close() {
logger.tracef("Closing %s", this);
this.isLive = false;
}
+ private static PagedMessage[] EMPTY_MSG = null;
+
+ private static PagedMessage[] noMessages() {
+ //it is a benign race: no need strong initializations here
+ PagedMessage[] empty = EMPTY_MSG;
+ if (empty != null) {
+ return empty;
+ } else {
+ empty = new PagedMessage[0];
--- End diff --
Why not simply make noMessages return a static empty array, e.g. make PagedMessage[0] - similar in nature to https://android.googlesource.com/platform/libcore/+/jb-mr2-release/luni/src/main/java/libcore/util/EmptyArray.java, and then saves instantiating every time.
Also avoids a race condition, this current code, seems to have with lazy initialization, e.g. currently if two threads concurrently call noMessages its possible i get two PagedMessage[0] objects created.
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246574775
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
@Override
- public synchronized int getNumberOfMessages() {
- return messages.size();
+ public int getNumberOfMessages() {
+ while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+ Thread.yield();
+ continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
}
@Override
- public synchronized void setMessages(PagedMessage[] messages) {
+ public void setMessages(PagedMessage[] messages) {
// This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
for (PagedMessage msg : messages) {
addLiveMessage(msg);
}
}
@Override
- public synchronized PagedMessage getMessage(int messageNumber) {
- if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
- } else {
+ public PagedMessage getMessage(int messageNumber) {
+ if (messageNumber < 0) {
return null;
}
+ //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
+ long size = lastSeenProducerIndex;
+ if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+ Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+ return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+ }
+ final AtomicChunk<PagedMessage> buffer;
+ final int offset;
+ if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+ } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+ }
+ //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set
+ PagedMessage msg;
+ while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+ }
+ return msg;
+ }
+
+ /**
+ * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries
+ * ie backward search of a node if needed.
+ */
+ private AtomicChunk<PagedMessage> jump(final int messageNumber, final long size) {
+ //fast division by a power of 2
+ final int jumps = messageNumber >> chunkSizeLog2;
+ //size is never allowed to be > Integer.MAX_VALUE
+ final int lastChunkIndex = (int) size >> chunkSizeLog2;
+ int requiredJumps = jumps;
+ AtomicChunk<PagedMessage> jumpBuffer = null;
+ boolean jumpForward = true;
+ int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+ //it's worth to go backward from lastChunkIndex?
+ //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer
+ if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk<PagedMessage> producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+ //we're saving some jumps ie is fine to go backward from here
+ jumpBuffer = producer;
+ requiredJumps = distanceFromLastChunkIndex;
+ jumpForward = false;
+ }
+ }
+ //start from the consumer buffer only is needed
+ if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+ }
+ for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+ }
+ return jumpBuffer;
}
@Override
- public synchronized boolean isLive() {
+ public boolean isLive() {
return isLive;
}
@Override
- public synchronized void addLiveMessage(PagedMessage message) {
+ public void addLiveMessage(PagedMessage message) {
if (message.getMessage().isLargeMessage()) {
((LargeServerMessage) message.getMessage()).incrementDelayDeletionCount();
}
- this.messages.add(message);
+ while (true) {
+ final long pIndex = producerIndex;
+ if (pIndex != RESIZING) {
+ if (pIndex == Integer.MAX_VALUE) {
+ throw new IllegalStateException("can't add more then " + Integer.MAX_VALUE + " messages");
+ }
+ //load acquire the current producer buffer
+ final AtomicChunk<PagedMessage> producerBuffer = this.producerBuffer;
+ final int pOffset = (int) (pIndex & chunkMask);
+ //only the first message to a chunk can attempt to resize
+ if (pOffset == 0) {
+ if (appendChunkAndMessage(producerBuffer, pIndex, message)) {
+ return;
+ }
+ } else if (PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, pIndex + 1)) {
+ //this.producerBuffer is the correct buffer to append a message: it is guarded by the producerIndex logic
+ //NOTE: producerIndex is being updated before setting a new value
+ producerBuffer.lazySet(pOffset, message);
+ return;
+ }
+ }
+ Thread.yield();
+ }
+ }
+
+ private boolean appendChunkAndMessage(AtomicChunk<PagedMessage> producerBuffer, long pIndex, PagedMessage message) {
+ if (!PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, RESIZING)) {
+ return false;
+ }
+ final AtomicChunk<PagedMessage> newChunk;
+ try {
+ final int index = (int) (pIndex >> chunkSizeLog2);
+ newChunk = new AtomicChunk<>(index, producerBuffer, chunkSize);
+ } catch (OutOfMemoryError oom) {
+ //unblock producerIndex without updating it
+ PRODUCER_INDEX_UPDATER.lazySet(this, pIndex);
+ throw oom;
+ }
+ //adding the message to it
+ newChunk.lazySet(0, message);
+ //linking it to the old one, if any
+ if (producerBuffer != null) {
+ //a plain store is enough, given that producerIndex prevents any reader/writer to access it
+ producerBuffer.next = newChunk;
+ } else {
+ //it's first one
+ this.consumerBuffer = newChunk;
+ }
+ //making it the current produced one
+ this.producerBuffer = newChunk;
+ //store release any previous write and "unblock" anyone waiting resizing to finish
+ PRODUCER_INDEX_UPDATER.lazySet(this, pIndex + 1);
+ return true;
}
@Override
- public synchronized void close() {
+ public void close() {
logger.tracef("Closing %s", this);
this.isLive = false;
}
+ private static PagedMessage[] EMPTY_MSG = null;
+
+ private static PagedMessage[] noMessages() {
+ //it is a benign race: no need strong initializations here
+ PagedMessage[] empty = EMPTY_MSG;
+ if (empty != null) {
+ return empty;
+ } else {
+ empty = new PagedMessage[0];
+ EMPTY_MSG = empty;
+ }
+ return empty;
+ }
+
@Override
- public synchronized PagedMessage[] getMessages() {
- return messages.toArray(new PagedMessage[messages.size()]);
+ public PagedMessage[] getMessages() {
+ long currentSize;
+ while ((currentSize = producerIndex) == RESIZING) {
--- End diff --
good catch! :+1: While creating a more generic collection I will refactor the bits to avoid duplication when possible
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246602226
--- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java ---
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.IntFunction;
+
+/**
+ * This collection is a concurrent append-only list that grows in chunks.<br>
+ * It's safe to be used by many threads concurrently and has a max capacity of {@link Integer#MAX_VALUE}.
+ */
+public final class ConcurrentAppendOnlyChunkedList<T> {
--- End diff --
Thanks to you guys and one day I will be better in naming things too I swear :D
---
[GitHub] activemq-artemis issue #2494: ARTEMIS-2224 Reduce contention on LivePageCach...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on the issue:
https://github.com/apache/activemq-artemis/pull/2494
Thanks for the review guys, without these comments I was lefting the work really not as good as it could be :100:
Re the logic, please tell me if anything is unclear, not commented as it deserves or if I need to write done a more concise summary of how the logic of this collection work ok?
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246569523
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
@Override
- public synchronized int getNumberOfMessages() {
- return messages.size();
+ public int getNumberOfMessages() {
+ while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+ Thread.yield();
+ continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
}
@Override
- public synchronized void setMessages(PagedMessage[] messages) {
+ public void setMessages(PagedMessage[] messages) {
// This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
for (PagedMessage msg : messages) {
addLiveMessage(msg);
}
}
@Override
- public synchronized PagedMessage getMessage(int messageNumber) {
- if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
- } else {
+ public PagedMessage getMessage(int messageNumber) {
+ if (messageNumber < 0) {
return null;
}
+ //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
+ long size = lastSeenProducerIndex;
+ if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+ Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+ return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+ }
+ final AtomicChunk<PagedMessage> buffer;
+ final int offset;
+ if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+ } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+ }
+ //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set
+ PagedMessage msg;
+ while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+ }
+ return msg;
+ }
+
+ /**
+ * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries
+ * ie backward search of a node if needed.
+ */
+ private AtomicChunk<PagedMessage> jump(final int messageNumber, final long size) {
+ //fast division by a power of 2
+ final int jumps = messageNumber >> chunkSizeLog2;
+ //size is never allowed to be > Integer.MAX_VALUE
+ final int lastChunkIndex = (int) size >> chunkSizeLog2;
+ int requiredJumps = jumps;
+ AtomicChunk<PagedMessage> jumpBuffer = null;
+ boolean jumpForward = true;
+ int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+ //it's worth to go backward from lastChunkIndex?
+ //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer
+ if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk<PagedMessage> producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+ //we're saving some jumps ie is fine to go backward from here
+ jumpBuffer = producer;
+ requiredJumps = distanceFromLastChunkIndex;
+ jumpForward = false;
+ }
+ }
+ //start from the consumer buffer only is needed
+ if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+ }
+ for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+ }
+ return jumpBuffer;
}
@Override
- public synchronized boolean isLive() {
+ public boolean isLive() {
return isLive;
}
@Override
- public synchronized void addLiveMessage(PagedMessage message) {
+ public void addLiveMessage(PagedMessage message) {
if (message.getMessage().isLargeMessage()) {
((LargeServerMessage) message.getMessage()).incrementDelayDeletionCount();
}
- this.messages.add(message);
+ while (true) {
+ final long pIndex = producerIndex;
+ if (pIndex != RESIZING) {
+ if (pIndex == Integer.MAX_VALUE) {
+ throw new IllegalStateException("can't add more then " + Integer.MAX_VALUE + " messages");
+ }
+ //load acquire the current producer buffer
+ final AtomicChunk<PagedMessage> producerBuffer = this.producerBuffer;
+ final int pOffset = (int) (pIndex & chunkMask);
+ //only the first message to a chunk can attempt to resize
+ if (pOffset == 0) {
+ if (appendChunkAndMessage(producerBuffer, pIndex, message)) {
+ return;
+ }
+ } else if (PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, pIndex + 1)) {
+ //this.producerBuffer is the correct buffer to append a message: it is guarded by the producerIndex logic
+ //NOTE: producerIndex is being updated before setting a new value
+ producerBuffer.lazySet(pOffset, message);
+ return;
+ }
+ }
+ Thread.yield();
+ }
+ }
+
+ private boolean appendChunkAndMessage(AtomicChunk<PagedMessage> producerBuffer, long pIndex, PagedMessage message) {
+ if (!PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, RESIZING)) {
+ return false;
+ }
+ final AtomicChunk<PagedMessage> newChunk;
+ try {
+ final int index = (int) (pIndex >> chunkSizeLog2);
+ newChunk = new AtomicChunk<>(index, producerBuffer, chunkSize);
+ } catch (OutOfMemoryError oom) {
+ //unblock producerIndex without updating it
+ PRODUCER_INDEX_UPDATER.lazySet(this, pIndex);
+ throw oom;
+ }
+ //adding the message to it
+ newChunk.lazySet(0, message);
+ //linking it to the old one, if any
+ if (producerBuffer != null) {
+ //a plain store is enough, given that producerIndex prevents any reader/writer to access it
+ producerBuffer.next = newChunk;
+ } else {
+ //it's first one
+ this.consumerBuffer = newChunk;
+ }
+ //making it the current produced one
+ this.producerBuffer = newChunk;
+ //store release any previous write and "unblock" anyone waiting resizing to finish
+ PRODUCER_INDEX_UPDATER.lazySet(this, pIndex + 1);
+ return true;
}
@Override
- public synchronized void close() {
+ public void close() {
logger.tracef("Closing %s", this);
this.isLive = false;
}
+ private static PagedMessage[] EMPTY_MSG = null;
+
+ private static PagedMessage[] noMessages() {
+ //it is a benign race: no need strong initializations here
+ PagedMessage[] empty = EMPTY_MSG;
+ if (empty != null) {
+ return empty;
+ } else {
+ empty = new PagedMessage[0];
+ EMPTY_MSG = empty;
+ }
+ return empty;
+ }
+
@Override
- public synchronized PagedMessage[] getMessages() {
- return messages.toArray(new PagedMessage[messages.size()]);
+ public PagedMessage[] getMessages() {
+ long currentSize;
+ while ((currentSize = producerIndex) == RESIZING) {
--- End diff --
Seems theres sections of code duplication with getMessage(int)
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568367
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
@Override
- public synchronized int getNumberOfMessages() {
- return messages.size();
+ public int getNumberOfMessages() {
+ while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+ Thread.yield();
+ continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
}
@Override
- public synchronized void setMessages(PagedMessage[] messages) {
+ public void setMessages(PagedMessage[] messages) {
// This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
for (PagedMessage msg : messages) {
addLiveMessage(msg);
}
}
@Override
- public synchronized PagedMessage getMessage(int messageNumber) {
- if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
- } else {
+ public PagedMessage getMessage(int messageNumber) {
+ if (messageNumber < 0) {
return null;
}
+ //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
+ long size = lastSeenProducerIndex;
+ if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+ Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+ return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+ }
+ final AtomicChunk<PagedMessage> buffer;
+ final int offset;
+ if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+ } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+ }
+ //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set
+ PagedMessage msg;
+ while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+ }
+ return msg;
+ }
+
+ /**
+ * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries
+ * ie backward search of a node if needed.
+ */
+ private AtomicChunk<PagedMessage> jump(final int messageNumber, final long size) {
+ //fast division by a power of 2
+ final int jumps = messageNumber >> chunkSizeLog2;
+ //size is never allowed to be > Integer.MAX_VALUE
+ final int lastChunkIndex = (int) size >> chunkSizeLog2;
+ int requiredJumps = jumps;
+ AtomicChunk<PagedMessage> jumpBuffer = null;
+ boolean jumpForward = true;
+ int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+ //it's worth to go backward from lastChunkIndex?
+ //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer
+ if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk<PagedMessage> producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+ //we're saving some jumps ie is fine to go backward from here
+ jumpBuffer = producer;
+ requiredJumps = distanceFromLastChunkIndex;
+ jumpForward = false;
+ }
+ }
+ //start from the consumer buffer only is needed
+ if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+ }
+ for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+ }
+ return jumpBuffer;
}
@Override
- public synchronized boolean isLive() {
+ public boolean isLive() {
return isLive;
}
@Override
- public synchronized void addLiveMessage(PagedMessage message) {
+ public void addLiveMessage(PagedMessage message) {
if (message.getMessage().isLargeMessage()) {
((LargeServerMessage) message.getMessage()).incrementDelayDeletionCount();
}
- this.messages.add(message);
+ while (true) {
+ final long pIndex = producerIndex;
+ if (pIndex != RESIZING) {
+ if (pIndex == Integer.MAX_VALUE) {
+ throw new IllegalStateException("can't add more then " + Integer.MAX_VALUE + " messages");
+ }
+ //load acquire the current producer buffer
+ final AtomicChunk<PagedMessage> producerBuffer = this.producerBuffer;
+ final int pOffset = (int) (pIndex & chunkMask);
+ //only the first message to a chunk can attempt to resize
+ if (pOffset == 0) {
+ if (appendChunkAndMessage(producerBuffer, pIndex, message)) {
+ return;
+ }
+ } else if (PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, pIndex + 1)) {
+ //this.producerBuffer is the correct buffer to append a message: it is guarded by the producerIndex logic
+ //NOTE: producerIndex is being updated before setting a new value
+ producerBuffer.lazySet(pOffset, message);
+ return;
+ }
+ }
+ Thread.yield();
+ }
+ }
+
+ private boolean appendChunkAndMessage(AtomicChunk<PagedMessage> producerBuffer, long pIndex, PagedMessage message) {
+ if (!PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, RESIZING)) {
+ return false;
+ }
+ final AtomicChunk<PagedMessage> newChunk;
+ try {
+ final int index = (int) (pIndex >> chunkSizeLog2);
+ newChunk = new AtomicChunk<>(index, producerBuffer, chunkSize);
+ } catch (OutOfMemoryError oom) {
+ //unblock producerIndex without updating it
+ PRODUCER_INDEX_UPDATER.lazySet(this, pIndex);
+ throw oom;
+ }
+ //adding the message to it
+ newChunk.lazySet(0, message);
+ //linking it to the old one, if any
+ if (producerBuffer != null) {
+ //a plain store is enough, given that producerIndex prevents any reader/writer to access it
+ producerBuffer.next = newChunk;
+ } else {
+ //it's first one
+ this.consumerBuffer = newChunk;
+ }
+ //making it the current produced one
+ this.producerBuffer = newChunk;
+ //store release any previous write and "unblock" anyone waiting resizing to finish
+ PRODUCER_INDEX_UPDATER.lazySet(this, pIndex + 1);
+ return true;
}
@Override
- public synchronized void close() {
+ public void close() {
logger.tracef("Closing %s", this);
this.isLive = false;
}
+ private static PagedMessage[] EMPTY_MSG = null;
+
+ private static PagedMessage[] noMessages() {
+ //it is a benign race: no need strong initializations here
+ PagedMessage[] empty = EMPTY_MSG;
+ if (empty != null) {
+ return empty;
+ } else {
+ empty = new PagedMessage[0];
--- End diff --
yep I have done it on purpose: is a benign race, because there is just a very low chance a new empty would be allocated several times, but I agree :+1:
It will makes the code much easier
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246601169
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
@Override
- public synchronized int getNumberOfMessages() {
- return messages.size();
+ public int getNumberOfMessages() {
+ while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+ Thread.yield();
+ continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
}
@Override
- public synchronized void setMessages(PagedMessage[] messages) {
+ public void setMessages(PagedMessage[] messages) {
// This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
for (PagedMessage msg : messages) {
addLiveMessage(msg);
}
}
@Override
- public synchronized PagedMessage getMessage(int messageNumber) {
- if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
- } else {
+ public PagedMessage getMessage(int messageNumber) {
+ if (messageNumber < 0) {
return null;
}
+ //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
+ long size = lastSeenProducerIndex;
+ if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+ Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+ return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+ }
+ final AtomicChunk<PagedMessage> buffer;
+ final int offset;
+ if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+ } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+ }
+ //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set
+ PagedMessage msg;
+ while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+ }
+ return msg;
+ }
+
+ /**
+ * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries
+ * ie backward search of a node if needed.
+ */
+ private AtomicChunk<PagedMessage> jump(final int messageNumber, final long size) {
+ //fast division by a power of 2
+ final int jumps = messageNumber >> chunkSizeLog2;
+ //size is never allowed to be > Integer.MAX_VALUE
+ final int lastChunkIndex = (int) size >> chunkSizeLog2;
+ int requiredJumps = jumps;
+ AtomicChunk<PagedMessage> jumpBuffer = null;
+ boolean jumpForward = true;
+ int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+ //it's worth to go backward from lastChunkIndex?
+ //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer
+ if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk<PagedMessage> producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+ //we're saving some jumps ie is fine to go backward from here
+ jumpBuffer = producer;
+ requiredJumps = distanceFromLastChunkIndex;
+ jumpForward = false;
+ }
+ }
+ //start from the consumer buffer only is needed
+ if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+ }
+ for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+ }
+ return jumpBuffer;
}
@Override
- public synchronized boolean isLive() {
+ public boolean isLive() {
return isLive;
}
@Override
- public synchronized void addLiveMessage(PagedMessage message) {
+ public void addLiveMessage(PagedMessage message) {
if (message.getMessage().isLargeMessage()) {
((LargeServerMessage) message.getMessage()).incrementDelayDeletionCount();
}
- this.messages.add(message);
+ while (true) {
+ final long pIndex = producerIndex;
+ if (pIndex != RESIZING) {
+ if (pIndex == Integer.MAX_VALUE) {
+ throw new IllegalStateException("can't add more then " + Integer.MAX_VALUE + " messages");
+ }
+ //load acquire the current producer buffer
+ final AtomicChunk<PagedMessage> producerBuffer = this.producerBuffer;
+ final int pOffset = (int) (pIndex & chunkMask);
+ //only the first message to a chunk can attempt to resize
+ if (pOffset == 0) {
+ if (appendChunkAndMessage(producerBuffer, pIndex, message)) {
+ return;
+ }
+ } else if (PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, pIndex + 1)) {
+ //this.producerBuffer is the correct buffer to append a message: it is guarded by the producerIndex logic
+ //NOTE: producerIndex is being updated before setting a new value
+ producerBuffer.lazySet(pOffset, message);
+ return;
+ }
+ }
+ Thread.yield();
+ }
+ }
+
+ private boolean appendChunkAndMessage(AtomicChunk<PagedMessage> producerBuffer, long pIndex, PagedMessage message) {
+ if (!PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, RESIZING)) {
+ return false;
+ }
+ final AtomicChunk<PagedMessage> newChunk;
+ try {
+ final int index = (int) (pIndex >> chunkSizeLog2);
+ newChunk = new AtomicChunk<>(index, producerBuffer, chunkSize);
+ } catch (OutOfMemoryError oom) {
+ //unblock producerIndex without updating it
+ PRODUCER_INDEX_UPDATER.lazySet(this, pIndex);
+ throw oom;
+ }
+ //adding the message to it
+ newChunk.lazySet(0, message);
+ //linking it to the old one, if any
+ if (producerBuffer != null) {
+ //a plain store is enough, given that producerIndex prevents any reader/writer to access it
+ producerBuffer.next = newChunk;
+ } else {
+ //it's first one
+ this.consumerBuffer = newChunk;
+ }
+ //making it the current produced one
+ this.producerBuffer = newChunk;
+ //store release any previous write and "unblock" anyone waiting resizing to finish
+ PRODUCER_INDEX_UPDATER.lazySet(this, pIndex + 1);
+ return true;
}
@Override
- public synchronized void close() {
+ public void close() {
logger.tracef("Closing %s", this);
this.isLive = false;
--- End diff --
This is a lot better!
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246600002
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
@@ -28,15 +28,49 @@
/**
* This is the same as PageCache, however this is for the page that's being currently written.
*/
--- End diff --
@clebertsuconic @michaelandrepearce Good points guys: i have re-implemented the logic in a proper collection (but that won't implement canonical `Collection` types, because it is not canonical at all)
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246570552
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java ---
@@ -864,21 +875,32 @@ private PageCursorInfo processACK(final PagePosition pos) {
return info;
}
+ private void installTXCallback(final Transaction tx, final PagePosition position) {
+ installTXCallback(tx, position, -1);
+ }
+
/**
* @param tx
* @param position
+ * @param persistentSize if negative it needs to be calculated on the fly
*/
- private void installTXCallback(final Transaction tx, final PagePosition position) {
+ private void installTXCallback(final Transaction tx, final PagePosition position, final long persistentSize) {
if (position.getRecordID() >= 0) {
// It needs to persist, otherwise the cursor will return to the fist page position
tx.setContainsPersistent();
}
PageCursorInfo info = getPageInfo(position);
PageCache cache = info.getCache();
- long size = 0;
if (cache != null) {
- size = getPersistentSize(cache.getMessage(position.getMessageNr()));
+ final long size;
+ if (persistentSize < 0) {
--- End diff --
-1 is used as a reserved value in another point to trigger the cache lookup
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246567905
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
@@ -28,15 +28,49 @@
/**
* This is the same as PageCache, however this is for the page that's being currently written.
*/
--- End diff --
Makes sense :+1:
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246577156
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
@Override
- public synchronized int getNumberOfMessages() {
- return messages.size();
+ public int getNumberOfMessages() {
+ while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+ Thread.yield();
+ continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
}
@Override
- public synchronized void setMessages(PagedMessage[] messages) {
+ public void setMessages(PagedMessage[] messages) {
// This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
for (PagedMessage msg : messages) {
addLiveMessage(msg);
}
}
@Override
- public synchronized PagedMessage getMessage(int messageNumber) {
- if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
- } else {
+ public PagedMessage getMessage(int messageNumber) {
+ if (messageNumber < 0) {
return null;
}
+ //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
+ long size = lastSeenProducerIndex;
+ if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+ Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+ return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+ }
+ final AtomicChunk<PagedMessage> buffer;
+ final int offset;
+ if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+ } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+ }
+ //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set
+ PagedMessage msg;
+ while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+ }
+ return msg;
+ }
+
+ /**
+ * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries
+ * ie backward search of a node if needed.
+ */
+ private AtomicChunk<PagedMessage> jump(final int messageNumber, final long size) {
+ //fast division by a power of 2
+ final int jumps = messageNumber >> chunkSizeLog2;
--- End diff --
cough ;) ... for the same comment you left me .... :P
---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246602031
--- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
@Override
- public synchronized int getNumberOfMessages() {
- return messages.size();
+ public int getNumberOfMessages() {
+ while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+ Thread.yield();
+ continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
}
@Override
- public synchronized void setMessages(PagedMessage[] messages) {
+ public void setMessages(PagedMessage[] messages) {
// This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
for (PagedMessage msg : messages) {
addLiveMessage(msg);
}
}
@Override
- public synchronized PagedMessage getMessage(int messageNumber) {
- if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
- } else {
+ public PagedMessage getMessage(int messageNumber) {
+ if (messageNumber < 0) {
return null;
}
+ //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
+ long size = lastSeenProducerIndex;
+ if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+ Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
--- End diff --
Is not that easy here, this one is not a common collection (in theory, but we can get an agreement on the contract for sure): it is more an append only list that allows indexed queries like a map.
If I remember correctly it is similar to an [hashed array tree](https://en.wikipedia.org/wiki/Hashed_array_tree), where the top-level directory is a double linked list of "folders" (instead of an array, like the original implementation): indeed in the code there are chunkIndex (==key of top level directory) and offset (==key of leaf into a directory).
---
[GitHub] activemq-artemis issue #2494: ARTEMIS-2224 Reduce contention on LivePageCach...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on the issue:
https://github.com/apache/activemq-artemis/pull/2494
Most of the benefits on memory footprint are already explained on https://github.com/qihongxu/activemq-artemis/pull/1, while about the contention I have already built several contention graphs that shows that this implementation scale linearly with the number of producers on a topic, without any contention.
In addition, several JMH microbenchs (not attached here) shows that is always fastest (if single-threaded too) then the original one for any operations and in any condition.
The only missing bits is an end 2 end test after applying #2484
---