You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/12/12 15:19:21 UTC
[1/3] cassandra git commit: Scale memtable slab allocation
logarithmically
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 cd714a1de -> 51f7cad48
refs/heads/trunk c2b26e5c6 -> b5795ef96
Scale memtable slab allocation logarithmically
patch by benedict; reviewed by jay patel for CASSANDRA-7882
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/51f7cad4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/51f7cad4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/51f7cad4
Branch: refs/heads/cassandra-2.1
Commit: 51f7cad480c335f07cfd74511c4aae91175b1bb7
Parents: cd714a1
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 12 14:17:04 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 12 14:17:04 2014 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/utils/memory/NativeAllocator.java | 156 ++++++++++---------
2 files changed, 83 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51f7cad4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 641b1f9..2571a09 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
* cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
* Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
* Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51f7cad4/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
index 1b5dcf2..0e15ed2 100644
--- a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
@@ -18,13 +18,15 @@
package org.apache.cassandra.utils.memory;
import java.lang.reflect.Field;
-
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.CounterCell;
import org.apache.cassandra.db.DecoratedKey;
@@ -35,25 +37,29 @@ import org.apache.cassandra.db.NativeCounterCell;
import org.apache.cassandra.db.NativeDecoratedKey;
import org.apache.cassandra.db.NativeDeletedCell;
import org.apache.cassandra.db.NativeExpiringCell;
+import org.apache.cassandra.io.util.IAllocator;
import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import sun.misc.Unsafe;
public class NativeAllocator extends MemtableAllocator
{
- private static final Logger logger = LoggerFactory.getLogger(NativeAllocator.class);
-
- private final static int REGION_SIZE = 1024 * 1024;
+ private final static int MAX_REGION_SIZE = 1 * 1024 * 1024;
private final static int MAX_CLONED_SIZE = 128 * 1024; // bigger than this don't go in the region
+ private final static int MIN_REGION_SIZE = 8 * 1024;
+
+ private static final IAllocator allocator = DatabaseDescriptor.getoffHeapMemoryAllocator();
// globally stash any Regions we allocate but are beaten to using, and use these up before allocating any more
- private static final ConcurrentLinkedQueue<Region> RACE_ALLOCATED = new ConcurrentLinkedQueue<>();
+ private static final Map<Integer, RaceAllocated> RACE_ALLOCATED = new HashMap<>();
+
+ static
+ {
+ for(int i = MIN_REGION_SIZE ; i <= MAX_REGION_SIZE; i *= 2)
+ RACE_ALLOCATED.put(i, new RaceAllocated());
+ }
private final AtomicReference<Region> currentRegion = new AtomicReference<>();
- private final AtomicInteger regionCount = new AtomicInteger(0);
private final ConcurrentLinkedQueue<Region> regions = new ConcurrentLinkedQueue<>();
- private AtomicLong unslabbed = new AtomicLong(0);
protected NativeAllocator(NativePool pool)
{
@@ -98,70 +104,88 @@ public class NativeAllocator extends MemtableAllocator
public long allocate(int size, OpOrder.Group opGroup)
{
assert size >= 0;
- offHeap().allocate(size, opGroup);
- // satisfy large allocations directly from JVM since they don't cause fragmentation
- // as badly, and fill up our regions quickly
if (size > MAX_CLONED_SIZE)
- {
- unslabbed.addAndGet(size);
- Region region = new Region(unsafe.allocateMemory(size), size);
- regions.add(region);
-
- long peer;
- if ((peer = region.allocate(size)) == -1)
- throw new AssertionError();
-
- return peer;
- }
+ return allocateOversize(size, opGroup);
while (true)
{
- Region region = getRegion();
-
+ Region region = currentRegion.get();
long peer;
- if ((peer = region.allocate(size)) > 0)
+ if (region != null && (peer = region.allocate(size)) > 0)
return peer;
- // not enough space!
- currentRegion.compareAndSet(region, null);
+ trySwapRegion(region, size);
}
}
+ private void trySwapRegion(Region current, int minSize)
+ {
+ // decide how big we want the new region to be:
+ // * if there is no prior region, we set it to min size
+ // * otherwise we double its size; if it's too small to fit the allocation, we round it up to 4-8x its size
+ int size;
+ if (current == null) size = MIN_REGION_SIZE;
+ else size = current.capacity * 2;
+ if (minSize > size)
+ size = Integer.highestOneBit(minSize) << 3;
+ size = Math.min(MAX_REGION_SIZE, size);
+
+ // first we try and repurpose a previously allocated region
+ RaceAllocated raceAllocated = RACE_ALLOCATED.get(size);
+ Region next = raceAllocated.poll();
+
+ // if there are none, we allocate one
+ if (next == null)
+ next = new Region(allocator.allocate(size), size);
+
+ // we try to swap in the region we've obtained;
+ // if we fail to swap the region, we try to stash it for repurposing later; if we're out of stash room, we free it
+ if (currentRegion.compareAndSet(current, next))
+ regions.add(next);
+ else if (!raceAllocated.stash(next))
+ allocator.free(next.peer);
+ }
+
+ private long allocateOversize(int size, OpOrder.Group opGroup)
+ {
+ // satisfy large allocations directly from JVM since they don't cause fragmentation
+ // as badly, and fill up our regions quickly
+ offHeap().allocate(size, opGroup);
+ Region region = new Region(allocator.allocate(size), size);
+ regions.add(region);
+
+ long peer;
+ if ((peer = region.allocate(size)) == -1)
+ throw new AssertionError();
+
+ return peer;
+ }
+
public void setDiscarded()
{
for (Region region : regions)
- unsafe.freeMemory(region.peer);
+ allocator.free(region.peer);
super.setDiscarded();
}
- /**
- * Get the current region, or, if there is no current region, allocate a new one
- */
- private Region getRegion()
+ // used to ensure we don't keep loads of race allocated regions around indefinitely. keeps the total bound on wasted memory low.
+ private static class RaceAllocated
{
- while (true)
+ final ConcurrentLinkedQueue<Region> stash = new ConcurrentLinkedQueue<>();
+ final Semaphore permits = new Semaphore(8);
+ boolean stash(Region region)
{
- // Try to get the region
- Region region = currentRegion.get();
- if (region != null)
- return region;
-
- // No current region, so we want to allocate one. We race
- // against other allocators to CAS in a Region, and if we fail we stash the region for re-use
- region = RACE_ALLOCATED.poll();
- if (region == null)
- region = new Region(unsafe.allocateMemory(REGION_SIZE), REGION_SIZE);
- if (currentRegion.compareAndSet(null, region))
- {
- regions.add(region);
- regionCount.incrementAndGet();
- logger.trace("{} regions now allocated in {}", regionCount, this);
- return region;
- }
-
- // someone else won race - that's fine, we'll try to grab theirs
- // in the next iteration of the loop.
- RACE_ALLOCATED.add(region);
+ if (!permits.tryAcquire())
+ return false;
+ stash.add(region);
+ return true;
+ }
+ Region poll()
+ {
+ Region next = stash.poll();
+ if (next != null)
+ permits.release();
+ return next;
}
}
@@ -180,7 +204,7 @@ public class NativeAllocator extends MemtableAllocator
*/
private final long peer;
- private final long capacity;
+ private final int capacity;
/**
* Offset for the next allocation, or the sentinel value -1
@@ -199,7 +223,7 @@ public class NativeAllocator extends MemtableAllocator
*
* @param peer peer
*/
- private Region(long peer, long capacity)
+ private Region(long peer, int capacity)
{
this.peer = peer;
this.capacity = capacity;
@@ -239,20 +263,4 @@ public class NativeAllocator extends MemtableAllocator
}
}
-
- static final Unsafe unsafe;
-
- static
- {
- try
- {
- Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
- field.setAccessible(true);
- unsafe = (sun.misc.Unsafe) field.get(null);
- }
- catch (Exception e)
- {
- throw new AssertionError(e);
- }
- }
}
[2/3] cassandra git commit: Scale memtable slab allocation
logarithmically
Posted by be...@apache.org.
Scale memtable slab allocation logarithmically
patch by benedict; reviewed by jay patel for CASSANDRA-7882
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/51f7cad4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/51f7cad4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/51f7cad4
Branch: refs/heads/trunk
Commit: 51f7cad480c335f07cfd74511c4aae91175b1bb7
Parents: cd714a1
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 12 14:17:04 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 12 14:17:04 2014 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/utils/memory/NativeAllocator.java | 156 ++++++++++---------
2 files changed, 83 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51f7cad4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 641b1f9..2571a09 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
* cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
* Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
* Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51f7cad4/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
index 1b5dcf2..0e15ed2 100644
--- a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
@@ -18,13 +18,15 @@
package org.apache.cassandra.utils.memory;
import java.lang.reflect.Field;
-
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.CounterCell;
import org.apache.cassandra.db.DecoratedKey;
@@ -35,25 +37,29 @@ import org.apache.cassandra.db.NativeCounterCell;
import org.apache.cassandra.db.NativeDecoratedKey;
import org.apache.cassandra.db.NativeDeletedCell;
import org.apache.cassandra.db.NativeExpiringCell;
+import org.apache.cassandra.io.util.IAllocator;
import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import sun.misc.Unsafe;
public class NativeAllocator extends MemtableAllocator
{
- private static final Logger logger = LoggerFactory.getLogger(NativeAllocator.class);
-
- private final static int REGION_SIZE = 1024 * 1024;
+ private final static int MAX_REGION_SIZE = 1 * 1024 * 1024;
private final static int MAX_CLONED_SIZE = 128 * 1024; // bigger than this don't go in the region
+ private final static int MIN_REGION_SIZE = 8 * 1024;
+
+ private static final IAllocator allocator = DatabaseDescriptor.getoffHeapMemoryAllocator();
// globally stash any Regions we allocate but are beaten to using, and use these up before allocating any more
- private static final ConcurrentLinkedQueue<Region> RACE_ALLOCATED = new ConcurrentLinkedQueue<>();
+ private static final Map<Integer, RaceAllocated> RACE_ALLOCATED = new HashMap<>();
+
+ static
+ {
+ for(int i = MIN_REGION_SIZE ; i <= MAX_REGION_SIZE; i *= 2)
+ RACE_ALLOCATED.put(i, new RaceAllocated());
+ }
private final AtomicReference<Region> currentRegion = new AtomicReference<>();
- private final AtomicInteger regionCount = new AtomicInteger(0);
private final ConcurrentLinkedQueue<Region> regions = new ConcurrentLinkedQueue<>();
- private AtomicLong unslabbed = new AtomicLong(0);
protected NativeAllocator(NativePool pool)
{
@@ -98,70 +104,88 @@ public class NativeAllocator extends MemtableAllocator
public long allocate(int size, OpOrder.Group opGroup)
{
assert size >= 0;
- offHeap().allocate(size, opGroup);
- // satisfy large allocations directly from JVM since they don't cause fragmentation
- // as badly, and fill up our regions quickly
if (size > MAX_CLONED_SIZE)
- {
- unslabbed.addAndGet(size);
- Region region = new Region(unsafe.allocateMemory(size), size);
- regions.add(region);
-
- long peer;
- if ((peer = region.allocate(size)) == -1)
- throw new AssertionError();
-
- return peer;
- }
+ return allocateOversize(size, opGroup);
while (true)
{
- Region region = getRegion();
-
+ Region region = currentRegion.get();
long peer;
- if ((peer = region.allocate(size)) > 0)
+ if (region != null && (peer = region.allocate(size)) > 0)
return peer;
- // not enough space!
- currentRegion.compareAndSet(region, null);
+ trySwapRegion(region, size);
}
}
+ private void trySwapRegion(Region current, int minSize)
+ {
+ // decide how big we want the new region to be:
+ // * if there is no prior region, we set it to min size
+ // * otherwise we double its size; if it's too small to fit the allocation, we round it up to 4-8x its size
+ int size;
+ if (current == null) size = MIN_REGION_SIZE;
+ else size = current.capacity * 2;
+ if (minSize > size)
+ size = Integer.highestOneBit(minSize) << 3;
+ size = Math.min(MAX_REGION_SIZE, size);
+
+ // first we try and repurpose a previously allocated region
+ RaceAllocated raceAllocated = RACE_ALLOCATED.get(size);
+ Region next = raceAllocated.poll();
+
+ // if there are none, we allocate one
+ if (next == null)
+ next = new Region(allocator.allocate(size), size);
+
+ // we try to swap in the region we've obtained;
+ // if we fail to swap the region, we try to stash it for repurposing later; if we're out of stash room, we free it
+ if (currentRegion.compareAndSet(current, next))
+ regions.add(next);
+ else if (!raceAllocated.stash(next))
+ allocator.free(next.peer);
+ }
+
+ private long allocateOversize(int size, OpOrder.Group opGroup)
+ {
+ // satisfy large allocations directly from JVM since they don't cause fragmentation
+ // as badly, and fill up our regions quickly
+ offHeap().allocate(size, opGroup);
+ Region region = new Region(allocator.allocate(size), size);
+ regions.add(region);
+
+ long peer;
+ if ((peer = region.allocate(size)) == -1)
+ throw new AssertionError();
+
+ return peer;
+ }
+
public void setDiscarded()
{
for (Region region : regions)
- unsafe.freeMemory(region.peer);
+ allocator.free(region.peer);
super.setDiscarded();
}
- /**
- * Get the current region, or, if there is no current region, allocate a new one
- */
- private Region getRegion()
+ // used to ensure we don't keep loads of race allocated regions around indefinitely. keeps the total bound on wasted memory low.
+ private static class RaceAllocated
{
- while (true)
+ final ConcurrentLinkedQueue<Region> stash = new ConcurrentLinkedQueue<>();
+ final Semaphore permits = new Semaphore(8);
+ boolean stash(Region region)
{
- // Try to get the region
- Region region = currentRegion.get();
- if (region != null)
- return region;
-
- // No current region, so we want to allocate one. We race
- // against other allocators to CAS in a Region, and if we fail we stash the region for re-use
- region = RACE_ALLOCATED.poll();
- if (region == null)
- region = new Region(unsafe.allocateMemory(REGION_SIZE), REGION_SIZE);
- if (currentRegion.compareAndSet(null, region))
- {
- regions.add(region);
- regionCount.incrementAndGet();
- logger.trace("{} regions now allocated in {}", regionCount, this);
- return region;
- }
-
- // someone else won race - that's fine, we'll try to grab theirs
- // in the next iteration of the loop.
- RACE_ALLOCATED.add(region);
+ if (!permits.tryAcquire())
+ return false;
+ stash.add(region);
+ return true;
+ }
+ Region poll()
+ {
+ Region next = stash.poll();
+ if (next != null)
+ permits.release();
+ return next;
}
}
@@ -180,7 +204,7 @@ public class NativeAllocator extends MemtableAllocator
*/
private final long peer;
- private final long capacity;
+ private final int capacity;
/**
* Offset for the next allocation, or the sentinel value -1
@@ -199,7 +223,7 @@ public class NativeAllocator extends MemtableAllocator
*
* @param peer peer
*/
- private Region(long peer, long capacity)
+ private Region(long peer, int capacity)
{
this.peer = peer;
this.capacity = capacity;
@@ -239,20 +263,4 @@ public class NativeAllocator extends MemtableAllocator
}
}
-
- static final Unsafe unsafe;
-
- static
- {
- try
- {
- Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
- field.setAccessible(true);
- unsafe = (sun.misc.Unsafe) field.get(null);
- }
- catch (Exception e)
- {
- throw new AssertionError(e);
- }
- }
}
[3/3] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b5795ef9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b5795ef9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b5795ef9
Branch: refs/heads/trunk
Commit: b5795ef96496ab8495eecfa84ce0ea07d9f29fe0
Parents: c2b26e5 51f7cad
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 12 14:19:05 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 12 14:19:05 2014 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/utils/memory/NativeAllocator.java | 131 +++++++++++--------
2 files changed, 76 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5795ef9/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 8881bef,2571a09..cf2a3fd
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,45 -1,5 +1,46 @@@
+3.0
+ * Support for user-defined aggregation functions (CASSANDRA-8053)
+ * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)
+ * Refactor SelectStatement, return IN results in natural order instead
+ of IN value list order (CASSANDRA-7981)
+ * Support UDTs, tuples, and collections in user-defined
+ functions (CASSANDRA-7563)
+ * Fix aggregate fn results on empty selection, result column name,
+ and cqlsh parsing (CASSANDRA-8229)
+ * Mark sstables as repaired after full repair (CASSANDRA-7586)
+ * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443)
+ * Integrate JMH for microbenchmarks (CASSANDRA-8151)
+ * Keep sstable levels when bootstrapping (CASSANDRA-7460)
+ * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
+ * Support for aggregation functions (CASSANDRA-4914)
+ * Remove cassandra-cli (CASSANDRA-7920)
+ * Accept dollar quoted strings in CQL (CASSANDRA-7769)
+ * Make assassinate a first class command (CASSANDRA-7935)
+ * Support IN clause on any clustering column (CASSANDRA-4762)
+ * Improve compaction logging (CASSANDRA-7818)
+ * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
+ * Do anticompaction in groups (CASSANDRA-6851)
+ * Support pure user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
+ 7924, 7812, 8063, 7813)
+ * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
+ * Move sstable RandomAccessReader to nio2, which allows using the
+ FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
+ * Remove CQL2 (CASSANDRA-5918)
+ * Add Thrift get_multi_slice call (CASSANDRA-6757)
+ * Optimize fetching multiple cells by name (CASSANDRA-6933)
+ * Allow compilation in java 8 (CASSANDRA-7028)
+ * Make incremental repair default (CASSANDRA-7250)
+ * Enable code coverage thru JaCoCo (CASSANDRA-7226)
+ * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369)
+ * Shorten SSTable path (CASSANDRA-6962)
+ * Use unsafe mutations for most unit tests (CASSANDRA-6969)
+ * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
+ * Fail on very large batch sizes (CASSANDRA-8011)
+ * Improve concurrency of repair (CASSANDRA-6455, 8208)
+
+
2.1.3
+ * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
* cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
* Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
* Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5795ef9/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
index ccb1104,0e15ed2..3c43a27
--- a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
@@@ -17,9 -17,12 +17,9 @@@
*/
package org.apache.cassandra.utils.memory;
-import java.lang.reflect.Field;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
+ import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
- import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.cassandra.config.CFMetaData;
@@@ -36,25 -39,27 +36,26 @@@ import org.apache.cassandra.db.NativeDe
import org.apache.cassandra.db.NativeExpiringCell;
import org.apache.cassandra.io.util.IAllocator;
import org.apache.cassandra.utils.concurrent.OpOrder;
public class NativeAllocator extends MemtableAllocator
{
- private static final Logger logger = LoggerFactory.getLogger(NativeAllocator.class);
-
- private final static int REGION_SIZE = 1024 * 1024;
+ private final static int MAX_REGION_SIZE = 1 * 1024 * 1024;
private final static int MAX_CLONED_SIZE = 128 * 1024; // bigger than this don't go in the region
+ private final static int MIN_REGION_SIZE = 8 * 1024;
private static final IAllocator allocator = DatabaseDescriptor.getoffHeapMemoryAllocator();
-
+
// globally stash any Regions we allocate but are beaten to using, and use these up before allocating any more
- private static final ConcurrentLinkedQueue<Region> RACE_ALLOCATED = new ConcurrentLinkedQueue<>();
+ private static final Map<Integer, RaceAllocated> RACE_ALLOCATED = new HashMap<>();
+
+ static
+ {
+ for(int i = MIN_REGION_SIZE ; i <= MAX_REGION_SIZE; i *= 2)
+ RACE_ALLOCATED.put(i, new RaceAllocated());
+ }
private final AtomicReference<Region> currentRegion = new AtomicReference<>();
- private final AtomicInteger regionCount = new AtomicInteger(0);
private final ConcurrentLinkedQueue<Region> regions = new ConcurrentLinkedQueue<>();
- private AtomicLong unslabbed = new AtomicLong(0);
protected NativeAllocator(NativePool pool)
{