You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/12/12 15:19:22 UTC

[2/3] cassandra git commit: Scale memtable slab allocation logarithmically

Scale memtable slab allocation logarithmically

patch by benedict; reviewed by jay patel for CASSANDRA-7882


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/51f7cad4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/51f7cad4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/51f7cad4

Branch: refs/heads/trunk
Commit: 51f7cad480c335f07cfd74511c4aae91175b1bb7
Parents: cd714a1
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 12 14:17:04 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 12 14:17:04 2014 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/utils/memory/NativeAllocator.java | 156 ++++++++++---------
 2 files changed, 83 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/51f7cad4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 641b1f9..2571a09 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
  * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
  * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
  * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51f7cad4/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
index 1b5dcf2..0e15ed2 100644
--- a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
@@ -18,13 +18,15 @@
 package org.apache.cassandra.utils.memory;
 
 import java.lang.reflect.Field;
-
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.CounterCell;
 import org.apache.cassandra.db.DecoratedKey;
@@ -35,25 +37,29 @@ import org.apache.cassandra.db.NativeCounterCell;
 import org.apache.cassandra.db.NativeDecoratedKey;
 import org.apache.cassandra.db.NativeDeletedCell;
 import org.apache.cassandra.db.NativeExpiringCell;
+import org.apache.cassandra.io.util.IAllocator;
 import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import sun.misc.Unsafe;
 
 public class NativeAllocator extends MemtableAllocator
 {
-    private static final Logger logger = LoggerFactory.getLogger(NativeAllocator.class);
-
-    private final static int REGION_SIZE = 1024 * 1024;
+    private final static int MAX_REGION_SIZE = 1 * 1024 * 1024;
     private final static int MAX_CLONED_SIZE = 128 * 1024; // bigger than this don't go in the region
+    private final static int MIN_REGION_SIZE = 8 * 1024;
+
+    private static final IAllocator allocator = DatabaseDescriptor.getoffHeapMemoryAllocator();
 
     // globally stash any Regions we allocate but are beaten to using, and use these up before allocating any more
-    private static final ConcurrentLinkedQueue<Region> RACE_ALLOCATED = new ConcurrentLinkedQueue<>();
+    private static final Map<Integer, RaceAllocated> RACE_ALLOCATED = new HashMap<>();
+
+    static
+    {
+        for(int i = MIN_REGION_SIZE ; i <= MAX_REGION_SIZE; i *= 2)
+            RACE_ALLOCATED.put(i, new RaceAllocated());
+    }
 
     private final AtomicReference<Region> currentRegion = new AtomicReference<>();
-    private final AtomicInteger regionCount = new AtomicInteger(0);
     private final ConcurrentLinkedQueue<Region> regions = new ConcurrentLinkedQueue<>();
-    private AtomicLong unslabbed = new AtomicLong(0);
 
     protected NativeAllocator(NativePool pool)
     {
@@ -98,70 +104,88 @@ public class NativeAllocator extends MemtableAllocator
     public long allocate(int size, OpOrder.Group opGroup)
     {
         assert size >= 0;
-        offHeap().allocate(size, opGroup);
-        // satisfy large allocations directly from JVM since they don't cause fragmentation
-        // as badly, and fill up our regions quickly
         if (size > MAX_CLONED_SIZE)
-        {
-            unslabbed.addAndGet(size);
-            Region region = new Region(unsafe.allocateMemory(size), size);
-            regions.add(region);
-
-            long peer;
-            if ((peer = region.allocate(size)) == -1)
-                throw new AssertionError();
-
-            return peer;
-        }
+            return allocateOversize(size, opGroup);
 
         while (true)
         {
-            Region region = getRegion();
-
+            Region region = currentRegion.get();
             long peer;
-            if ((peer = region.allocate(size)) > 0)
+            if (region != null && (peer = region.allocate(size)) > 0)
                 return peer;
 
-            // not enough space!
-            currentRegion.compareAndSet(region, null);
+            trySwapRegion(region, size);
         }
     }
 
+    private void trySwapRegion(Region current, int minSize)
+    {
+        // decide how big we want the new region to be:
+        //  * if there is no prior region, we set it to min size
+        //  * otherwise we double its size; if it's too small to fit the allocation, we round it up to 4-8x its size
+        int size;
+        if (current == null) size = MIN_REGION_SIZE;
+        else size = current.capacity * 2;
+        if (minSize > size)
+            size = Integer.highestOneBit(minSize) << 3;
+        size = Math.min(MAX_REGION_SIZE, size);
+
+        // first we try and repurpose a previously allocated region
+        RaceAllocated raceAllocated = RACE_ALLOCATED.get(size);
+        Region next = raceAllocated.poll();
+
+        // if there are none, we allocate one
+        if (next == null)
+            next = new Region(allocator.allocate(size), size);
+
+        // we try to swap in the region we've obtained;
+        // if we fail to swap the region, we try to stash it for repurposing later; if we're out of stash room, we free it
+        if (currentRegion.compareAndSet(current, next))
+            regions.add(next);
+        else if (!raceAllocated.stash(next))
+            allocator.free(next.peer);
+    }
+
+    private long allocateOversize(int size, OpOrder.Group opGroup)
+    {
+        // satisfy large allocations directly from JVM since they don't cause fragmentation
+        // as badly, and fill up our regions quickly
+        offHeap().allocate(size, opGroup);
+        Region region = new Region(allocator.allocate(size), size);
+        regions.add(region);
+
+        long peer;
+        if ((peer = region.allocate(size)) == -1)
+            throw new AssertionError();
+
+        return peer;
+    }
+
     public void setDiscarded()
     {
         for (Region region : regions)
-            unsafe.freeMemory(region.peer);
+            allocator.free(region.peer);
         super.setDiscarded();
     }
 
-    /**
-     * Get the current region, or, if there is no current region, allocate a new one
-     */
-    private Region getRegion()
+    // used to ensure we don't keep loads of race allocated regions around indefinitely. keeps the total bound on wasted memory low.
+    private static class RaceAllocated
     {
-        while (true)
+        final ConcurrentLinkedQueue<Region> stash = new ConcurrentLinkedQueue<>();
+        final Semaphore permits = new Semaphore(8);
+        boolean stash(Region region)
         {
-            // Try to get the region
-            Region region = currentRegion.get();
-            if (region != null)
-                return region;
-
-            // No current region, so we want to allocate one. We race
-            // against other allocators to CAS in a Region, and if we fail we stash the region for re-use
-            region = RACE_ALLOCATED.poll();
-            if (region == null)
-                region = new Region(unsafe.allocateMemory(REGION_SIZE), REGION_SIZE);
-            if (currentRegion.compareAndSet(null, region))
-            {
-                regions.add(region);
-                regionCount.incrementAndGet();
-                logger.trace("{} regions now allocated in {}", regionCount, this);
-                return region;
-            }
-
-            // someone else won race - that's fine, we'll try to grab theirs
-            // in the next iteration of the loop.
-            RACE_ALLOCATED.add(region);
+            if (!permits.tryAcquire())
+                return false;
+            stash.add(region);
+            return true;
+        }
+        Region poll()
+        {
+            Region next = stash.poll();
+            if (next != null)
+                permits.release();
+            return next;
         }
     }
 
@@ -180,7 +204,7 @@ public class NativeAllocator extends MemtableAllocator
          */
         private final long peer;
 
-        private final long capacity;
+        private final int capacity;
 
         /**
          * Offset for the next allocation, or the sentinel value -1
@@ -199,7 +223,7 @@ public class NativeAllocator extends MemtableAllocator
          *
          * @param peer peer
          */
-        private Region(long peer, long capacity)
+        private Region(long peer, int capacity)
         {
             this.peer = peer;
             this.capacity = capacity;
@@ -239,20 +263,4 @@ public class NativeAllocator extends MemtableAllocator
         }
     }
 
-
-    static final Unsafe unsafe;
-
-    static
-    {
-        try
-        {
-            Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
-            field.setAccessible(true);
-            unsafe = (sun.misc.Unsafe) field.get(null);
-        }
-        catch (Exception e)
-        {
-            throw new AssertionError(e);
-        }
-    }
 }