You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2021/08/01 13:55:51 UTC
[cassandra] branch cassandra-3.11 updated: HintsBuffer rewriten CAS
loops to atomic adds, CommitLogSegment adds ConstantBackoffCAS Algorithm
This is an automated email from the ASF dual-hosted git repository.
mck pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
new f575eea HintsBuffer rewriten CAS loops to atomic adds, CommitLogSegment adds ConstantBackoffCAS Algorithm
f575eea is described below
commit f575eea034def1103c5403b11fc8fe58b60d3ebe
Author: Mick Semb Wever <mc...@apache.org>
AuthorDate: Fri Jul 31 22:36:03 2020 +0200
HintsBuffer rewriten CAS loops to atomic adds, CommitLogSegment adds ConstantBackoffCAS Algorithm
patch by Mick Semb Wever; reviewed by Berenguer Blasi, Benjamin Lerer for CASSANDRA-16072
---
CHANGES.txt | 1 +
.../cassandra/db/commitlog/CommitLogSegment.java | 2 ++
.../org/apache/cassandra/hints/HintsBuffer.java | 37 ++++++++++------------
3 files changed, 19 insertions(+), 21 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index d80ab6f..89610c0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.12
+ * Reduce thread contention in CommitLogSegment and HintsBuffer (CASSANDRA-16072)
Merged from 3.0:
* Handle properly UnsatisfiedLinkError in NativeLibrary#getProcessID() (CASSANDRA-16578)
* Remove mutation data from error log message (CASSANDRA-16817)
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 193be91..af25b4a 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -26,6 +26,7 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
import java.util.zip.CRC32;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -253,6 +254,7 @@ public abstract class CommitLogSegment
assert buffer != null;
return prev;
}
+ LockSupport.parkNanos(1); // ConstantBackoffCAS Algorithm from https://arxiv.org/pdf/1305.5800.pdf
}
}
diff --git a/src/java/org/apache/cassandra/hints/HintsBuffer.java b/src/java/org/apache/cassandra/hints/HintsBuffer.java
index 835a3be..d944b4d 100644
--- a/src/java/org/apache/cassandra/hints/HintsBuffer.java
+++ b/src/java/org/apache/cassandra/hints/HintsBuffer.java
@@ -23,7 +23,7 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.CRC32;
import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -52,10 +52,9 @@ final class HintsBuffer
{
// hint entry overhead in bytes (int length, int length checksum, int body checksum)
static final int ENTRY_OVERHEAD_SIZE = 12;
- static final int CLOSED = -1;
private final ByteBuffer slab; // the underlying backing ByteBuffer for all the serialized hints
- private final AtomicInteger position; // the position in the slab that we currently allocate from
+ private final AtomicLong position; // the position in the slab that we currently allocate from
private final ConcurrentMap<UUID, Queue<Integer>> offsets;
private final OpOrder appendOrder;
@@ -64,7 +63,7 @@ final class HintsBuffer
{
this.slab = slab;
- position = new AtomicInteger();
+ position = new AtomicLong();
offsets = new ConcurrentHashMap<>();
appendOrder = new OpOrder();
}
@@ -76,7 +75,7 @@ final class HintsBuffer
boolean isClosed()
{
- return position.get() == CLOSED;
+ return position.get() < 0;
}
int capacity()
@@ -86,8 +85,8 @@ final class HintsBuffer
int remaining()
{
- int pos = position.get();
- return pos == CLOSED ? 0 : capacity() - pos;
+ long pos = position.get();
+ return (int) (pos < 0 ? 0 : Math.max(0, capacity() - pos));
}
HintsBuffer recycle()
@@ -177,25 +176,21 @@ final class HintsBuffer
return new Allocation(offset, totalSize, opGroup);
}
+ // allocate bytes in the slab, or return negative if not enough space
private int allocateBytes(int totalSize)
{
- while (true)
- {
- int prev = position.get();
- int next = prev + totalSize;
-
- if (prev == CLOSED) // the slab has been 'closed'
- return CLOSED;
+ long prev = position.getAndAdd(totalSize);
- if (next > slab.capacity())
- {
- position.set(CLOSED); // mark the slab as no longer allocating if we've exceeded its capacity
- return CLOSED;
- }
+ if (prev < 0) // the slab has been 'closed'
+ return -1;
- if (position.compareAndSet(prev, next))
- return prev;
+ if ((prev + totalSize) > slab.capacity())
+ {
+ position.set(Long.MIN_VALUE); // mark the slab as no longer allocating if we've exceeded its capacity
+ return -1;
}
+
+ return (int)prev;
}
private void put(UUID hostId, int offset)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org