You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2021/08/01 13:55:51 UTC

[cassandra] branch cassandra-3.11 updated: HintsBuffer rewriten CAS loops to atomic adds, CommitLogSegment adds ConstantBackoffCAS Algorithm

This is an automated email from the ASF dual-hosted git repository.

mck pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
     new f575eea  HintsBuffer rewriten CAS loops to atomic adds, CommitLogSegment adds ConstantBackoffCAS Algorithm
f575eea is described below

commit f575eea034def1103c5403b11fc8fe58b60d3ebe
Author: Mick Semb Wever <mc...@apache.org>
AuthorDate: Fri Jul 31 22:36:03 2020 +0200

    HintsBuffer rewriten CAS loops to atomic adds, CommitLogSegment adds ConstantBackoffCAS Algorithm
    
     patch by Mick Semb Wever; reviewed by Berenguer Blasi, Benjamin Lerer for CASSANDRA-16072
---
 CHANGES.txt                                        |  1 +
 .../cassandra/db/commitlog/CommitLogSegment.java   |  2 ++
 .../org/apache/cassandra/hints/HintsBuffer.java    | 37 ++++++++++------------
 3 files changed, 19 insertions(+), 21 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index d80ab6f..89610c0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.12
+ * Reduce thread contention in CommitLogSegment and HintsBuffer (CASSANDRA-16072)
 Merged from 3.0:
  * Handle properly UnsatisfiedLinkError in NativeLibrary#getProcessID() (CASSANDRA-16578)
  * Remove mutation data from error log message (CASSANDRA-16817)
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 193be91..af25b4a 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -26,6 +26,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
 import java.util.zip.CRC32;
 
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -253,6 +254,7 @@ public abstract class CommitLogSegment
                 assert buffer != null;
                 return prev;
             }
+            LockSupport.parkNanos(1); // ConstantBackoffCAS Algorithm from https://arxiv.org/pdf/1305.5800.pdf
         }
     }
 
diff --git a/src/java/org/apache/cassandra/hints/HintsBuffer.java b/src/java/org/apache/cassandra/hints/HintsBuffer.java
index 835a3be..d944b4d 100644
--- a/src/java/org/apache/cassandra/hints/HintsBuffer.java
+++ b/src/java/org/apache/cassandra/hints/HintsBuffer.java
@@ -23,7 +23,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.CRC32;
 
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -52,10 +52,9 @@ final class HintsBuffer
 {
     // hint entry overhead in bytes (int length, int length checksum, int body checksum)
     static final int ENTRY_OVERHEAD_SIZE = 12;
-    static final int CLOSED = -1;
 
     private final ByteBuffer slab; // the underlying backing ByteBuffer for all the serialized hints
-    private final AtomicInteger position; // the position in the slab that we currently allocate from
+    private final AtomicLong position; // the position in the slab that we currently allocate from
 
     private final ConcurrentMap<UUID, Queue<Integer>> offsets;
     private final OpOrder appendOrder;
@@ -64,7 +63,7 @@ final class HintsBuffer
     {
         this.slab = slab;
 
-        position = new AtomicInteger();
+        position = new AtomicLong();
         offsets = new ConcurrentHashMap<>();
         appendOrder = new OpOrder();
     }
@@ -76,7 +75,7 @@ final class HintsBuffer
 
     boolean isClosed()
     {
-        return position.get() == CLOSED;
+        return position.get() < 0;
     }
 
     int capacity()
@@ -86,8 +85,8 @@ final class HintsBuffer
 
     int remaining()
     {
-        int pos = position.get();
-        return pos == CLOSED ? 0 : capacity() - pos;
+        long pos = position.get();
+        return (int) (pos < 0 ? 0 : Math.max(0, capacity() - pos));
     }
 
     HintsBuffer recycle()
@@ -177,25 +176,21 @@ final class HintsBuffer
         return new Allocation(offset, totalSize, opGroup);
     }
 
+    // allocate bytes in the slab, or return negative if not enough space
     private int allocateBytes(int totalSize)
     {
-        while (true)
-        {
-            int prev = position.get();
-            int next = prev + totalSize;
-
-            if (prev == CLOSED) // the slab has been 'closed'
-                return CLOSED;
+        long prev = position.getAndAdd(totalSize);
 
-            if (next > slab.capacity())
-            {
-                position.set(CLOSED); // mark the slab as no longer allocating if we've exceeded its capacity
-                return CLOSED;
-            }
+        if (prev < 0) // the slab has been 'closed'
+            return -1;
 
-            if (position.compareAndSet(prev, next))
-                return prev;
+        if ((prev + totalSize) > slab.capacity())
+        {
+            position.set(Long.MIN_VALUE); // mark the slab as no longer allocating if we've exceeded its capacity
+            return -1;
         }
+
+        return (int)prev;
     }
 
     private void put(UUID hostId, int offset)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org