You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/12/12 15:19:22 UTC
[2/3] cassandra git commit: Scale memtable slab allocation
logarithmically
Scale memtable slab allocation logarithmically
patch by benedict; reviewed by jay patel for CASSANDRA-7882
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/51f7cad4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/51f7cad4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/51f7cad4
Branch: refs/heads/trunk
Commit: 51f7cad480c335f07cfd74511c4aae91175b1bb7
Parents: cd714a1
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 12 14:17:04 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 12 14:17:04 2014 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/utils/memory/NativeAllocator.java | 156 ++++++++++---------
2 files changed, 83 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51f7cad4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 641b1f9..2571a09 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
* cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
* Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
* Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51f7cad4/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
index 1b5dcf2..0e15ed2 100644
--- a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
@@ -18,13 +18,15 @@
package org.apache.cassandra.utils.memory;
import java.lang.reflect.Field;
-
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.CounterCell;
import org.apache.cassandra.db.DecoratedKey;
@@ -35,25 +37,29 @@ import org.apache.cassandra.db.NativeCounterCell;
import org.apache.cassandra.db.NativeDecoratedKey;
import org.apache.cassandra.db.NativeDeletedCell;
import org.apache.cassandra.db.NativeExpiringCell;
+import org.apache.cassandra.io.util.IAllocator;
import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import sun.misc.Unsafe;
public class NativeAllocator extends MemtableAllocator
{
- private static final Logger logger = LoggerFactory.getLogger(NativeAllocator.class);
-
- private final static int REGION_SIZE = 1024 * 1024;
+ private final static int MAX_REGION_SIZE = 1 * 1024 * 1024;
private final static int MAX_CLONED_SIZE = 128 * 1024; // bigger than this don't go in the region
+ private final static int MIN_REGION_SIZE = 8 * 1024;
+
+ private static final IAllocator allocator = DatabaseDescriptor.getoffHeapMemoryAllocator();
// globally stash any Regions we allocate but are beaten to using, and use these up before allocating any more
- private static final ConcurrentLinkedQueue<Region> RACE_ALLOCATED = new ConcurrentLinkedQueue<>();
+ private static final Map<Integer, RaceAllocated> RACE_ALLOCATED = new HashMap<>();
+
+ static
+ {
+ for(int i = MIN_REGION_SIZE ; i <= MAX_REGION_SIZE; i *= 2)
+ RACE_ALLOCATED.put(i, new RaceAllocated());
+ }
private final AtomicReference<Region> currentRegion = new AtomicReference<>();
- private final AtomicInteger regionCount = new AtomicInteger(0);
private final ConcurrentLinkedQueue<Region> regions = new ConcurrentLinkedQueue<>();
- private AtomicLong unslabbed = new AtomicLong(0);
protected NativeAllocator(NativePool pool)
{
@@ -98,70 +104,88 @@ public class NativeAllocator extends MemtableAllocator
public long allocate(int size, OpOrder.Group opGroup)
{
assert size >= 0;
- offHeap().allocate(size, opGroup);
- // satisfy large allocations directly from JVM since they don't cause fragmentation
- // as badly, and fill up our regions quickly
if (size > MAX_CLONED_SIZE)
- {
- unslabbed.addAndGet(size);
- Region region = new Region(unsafe.allocateMemory(size), size);
- regions.add(region);
-
- long peer;
- if ((peer = region.allocate(size)) == -1)
- throw new AssertionError();
-
- return peer;
- }
+ return allocateOversize(size, opGroup);
while (true)
{
- Region region = getRegion();
-
+ Region region = currentRegion.get();
long peer;
- if ((peer = region.allocate(size)) > 0)
+ if (region != null && (peer = region.allocate(size)) > 0)
return peer;
- // not enough space!
- currentRegion.compareAndSet(region, null);
+ trySwapRegion(region, size);
}
}
+ private void trySwapRegion(Region current, int minSize)
+ {
+ // decide how big we want the new region to be:
+ // * if there is no prior region, we set it to min size
+ // * otherwise we double its size; if it's too small to fit the allocation, we round it up to 4-8x its size
+ int size;
+ if (current == null) size = MIN_REGION_SIZE;
+ else size = current.capacity * 2;
+ if (minSize > size)
+ size = Integer.highestOneBit(minSize) << 3;
+ size = Math.min(MAX_REGION_SIZE, size);
+
+ // first we try and repurpose a previously allocated region
+ RaceAllocated raceAllocated = RACE_ALLOCATED.get(size);
+ Region next = raceAllocated.poll();
+
+ // if there are none, we allocate one
+ if (next == null)
+ next = new Region(allocator.allocate(size), size);
+
+ // we try to swap in the region we've obtained;
+ // if we fail to swap the region, we try to stash it for repurposing later; if we're out of stash room, we free it
+ if (currentRegion.compareAndSet(current, next))
+ regions.add(next);
+ else if (!raceAllocated.stash(next))
+ allocator.free(next.peer);
+ }
+
+ private long allocateOversize(int size, OpOrder.Group opGroup)
+ {
+ // satisfy large allocations directly from JVM since they don't cause fragmentation
+ // as badly, and fill up our regions quickly
+ offHeap().allocate(size, opGroup);
+ Region region = new Region(allocator.allocate(size), size);
+ regions.add(region);
+
+ long peer;
+ if ((peer = region.allocate(size)) == -1)
+ throw new AssertionError();
+
+ return peer;
+ }
+
public void setDiscarded()
{
for (Region region : regions)
- unsafe.freeMemory(region.peer);
+ allocator.free(region.peer);
super.setDiscarded();
}
- /**
- * Get the current region, or, if there is no current region, allocate a new one
- */
- private Region getRegion()
+ // used to ensure we don't keep loads of race allocated regions around indefinitely. keeps the total bound on wasted memory low.
+ private static class RaceAllocated
{
- while (true)
+ final ConcurrentLinkedQueue<Region> stash = new ConcurrentLinkedQueue<>();
+ final Semaphore permits = new Semaphore(8);
+ boolean stash(Region region)
{
- // Try to get the region
- Region region = currentRegion.get();
- if (region != null)
- return region;
-
- // No current region, so we want to allocate one. We race
- // against other allocators to CAS in a Region, and if we fail we stash the region for re-use
- region = RACE_ALLOCATED.poll();
- if (region == null)
- region = new Region(unsafe.allocateMemory(REGION_SIZE), REGION_SIZE);
- if (currentRegion.compareAndSet(null, region))
- {
- regions.add(region);
- regionCount.incrementAndGet();
- logger.trace("{} regions now allocated in {}", regionCount, this);
- return region;
- }
-
- // someone else won race - that's fine, we'll try to grab theirs
- // in the next iteration of the loop.
- RACE_ALLOCATED.add(region);
+ if (!permits.tryAcquire())
+ return false;
+ stash.add(region);
+ return true;
+ }
+ Region poll()
+ {
+ Region next = stash.poll();
+ if (next != null)
+ permits.release();
+ return next;
}
}
@@ -180,7 +204,7 @@ public class NativeAllocator extends MemtableAllocator
*/
private final long peer;
- private final long capacity;
+ private final int capacity;
/**
* Offset for the next allocation, or the sentinel value -1
@@ -199,7 +223,7 @@ public class NativeAllocator extends MemtableAllocator
*
* @param peer peer
*/
- private Region(long peer, long capacity)
+ private Region(long peer, int capacity)
{
this.peer = peer;
this.capacity = capacity;
@@ -239,20 +263,4 @@ public class NativeAllocator extends MemtableAllocator
}
}
-
- static final Unsafe unsafe;
-
- static
- {
- try
- {
- Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
- field.setAccessible(true);
- unsafe = (sun.misc.Unsafe) field.get(null);
- }
- catch (Exception e)
- {
- throw new AssertionError(e);
- }
- }
}