You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directmemory.apache.org by bp...@apache.org on 2012/03/01 12:41:36 UTC
svn commit: r1295522 [1/2] - in /incubator/directmemory/trunk:
directmemory-cache/src/main/java/org/apache/directmemory/cache/
directmemory-cache/src/main/java/org/apache/directmemory/memory/
directmemory-cache/src/main/java/org/apache/directmemory/mem...
Author: bperroud
Date: Thu Mar 1 11:41:34 2012
New Revision: 1295522
URL: http://svn.apache.org/viewvc?rev=1295522&view=rev
Log:
DIRECTMEMORY-40, DIRECTMEMORY-60 : separate responsabilities more clearly, set allocation with merging pointers as the default one, add a SLAB's style allocator (fixed buffer size), mark OffHeapMemoryBuffer as deprecated
Added:
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/AbstractByteBufferAllocator.java
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/ByteBufferAllocator.java
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/DirectByteBufferUtils.java
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeByteBufferAllocatorImpl.java
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/MergingByteBufferAllocatorImpl.java
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/SlabByteBufferAllocatorImpl.java
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/allocator/
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/allocator/FixedSizeByteBufferAllocatorImplTest.java
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/allocator/MergingByteBufferAllocatorImplTest.java
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/allocator/SlabByteBufferAllocatorImplTest.java
Removed:
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMergingMemoryBufferImpl.java
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplWithMerginOHMBAndAllocationPolicyTest.java
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplWithMerginOHMBTest.java
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/OffHeapMergingMemoryBufferTest.java
Modified:
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceWithAllocationPolicyImpl.java
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBuffer.java
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBufferImpl.java
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/RoundRobinAllocationPolicy.java
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/cache/CacheServiceImplTest.java
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/AbstractOffHeapMemoryBufferTest.java
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent2Test.java
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent3Test.java
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplTest.java
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerTest.java
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/RoundRobinAllocationPolicyTest.java
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Starter.java
incubator/directmemory/trunk/integrations/ehcache/src/main/java/org/apache/directmemory/ehcache/DirectMemoryCache.java
incubator/directmemory/trunk/integrations/ehcache/src/main/java/org/apache/directmemory/ehcache/DirectMemoryStore.java
incubator/directmemory/trunk/integrations/ehcache/src/test/java/org/apache/directmemory/ehcache/EHCacheTest.java
incubator/directmemory/trunk/itests/osgi/src/test/java/org/apache/directmemory/tests/osgi/cache/CacheServiceExportingActivator.java
Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java Thu Mar 1 11:41:34 2012
@@ -295,7 +295,17 @@ public class CacheServiceImpl<K, V>
logger.info( format( "off-heap - buffer: \t%1d", mem.getBufferNumber() ) );
logger.info( format( "off-heap - allocated: \t%1s", Ram.inMb( mem.capacity() ) ) );
logger.info( format( "off-heap - used: \t%1s", Ram.inMb( mem.used() ) ) );
- logger.info( format( "heap - max: \t%1s", Ram.inMb( Runtime.getRuntime().maxMemory() ) ) );
+ logger.info( format( "heap - max: \t%1s", Ram.inMb( Runtime.getRuntime().maxMemory() ) ) );
+ logger.info( format( "heap - allocated: \t%1s", Ram.inMb( Runtime.getRuntime().totalMemory() ) ) );
+ logger.info( format( "heap - free : \t%1s", Ram.inMb( Runtime.getRuntime().freeMemory() ) ) );
+ logger.info( "************************************************" );
+ }
+
+ public void dump( MemoryManagerService<V> mms )
+ {
+ logger.info( format( "off-heap - allocated: \t%1s", Ram.inMb( mms.capacity() ) ) );
+ logger.info( format( "off-heap - used: \t%1s", Ram.inMb( mms.used() ) ) );
+ logger.info( format( "heap - max: \t%1s", Ram.inMb( Runtime.getRuntime().maxMemory() ) ) );
logger.info( format( "heap - allocated: \t%1s", Ram.inMb( Runtime.getRuntime().totalMemory() ) ) );
logger.info( format( "heap - free : \t%1s", Ram.inMb( Runtime.getRuntime().freeMemory() ) ) );
logger.info( "************************************************" );
@@ -311,10 +321,7 @@ public class CacheServiceImpl<K, V>
logger.info( "*** DirectMemory statistics ********************" );
- for ( OffHeapMemoryBuffer<V> mem : memoryManager.getBuffers() )
- {
- dump( mem );
- }
+ dump( memoryManager );
}
@Override
Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java Thu Mar 1 11:41:34 2012
@@ -21,6 +21,8 @@ package org.apache.directmemory.memory;
import java.util.List;
+import org.apache.directmemory.memory.allocator.ByteBufferAllocator;
+
/**
* Interface describing the buffer allocation policy.
* The implementations will be initialized by setting the list of buffers {@link #init(List)},
@@ -30,7 +32,7 @@ import java.util.List;
* @author bperroud
*
*/
-public interface AllocationPolicy<T>
+public interface AllocationPolicy
{
/**
@@ -38,16 +40,16 @@ public interface AllocationPolicy<T>
*
* @param buffers
*/
- void init( List<OffHeapMemoryBuffer<T>> buffers );
+ void init( List<ByteBufferAllocator> allocators );
/**
- * Returns the active buffer in which to allocate.
+ * Returns the {@link ByteBufferAllocator} to use to allocate.
*
- * @param previouslyAllocatedBuffer : the previously allocated buffer, or null if it's the first allocation
+ * @param previousAllocator : the previously used {@link ByteBufferAllocator}, or null if it's the first allocation
* @param allocationNumber : the number of time the allocation has already failed.
- * @return the buffer to allocate, or null if allocation has failed.
+ * @return the {@link ByteBufferAllocator} to use, or null if allocation has failed.
*/
- OffHeapMemoryBuffer<T> getActiveBuffer( OffHeapMemoryBuffer<T> previouslyAllocatedBuffer, int allocationNumber );
+ ByteBufferAllocator getActiveAllocator( ByteBufferAllocator previousAllocator, int allocationNumber );
/**
* Reset internal state
Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java Thu Mar 1 11:41:34 2012
@@ -22,8 +22,6 @@ package org.apache.directmemory.memory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-
public class MemoryManager
{
private static Logger logger = LoggerFactory.getLogger( MemoryManager.class );
@@ -85,17 +83,6 @@ public class MemoryManager
memoryManager.collectLFU();
}
- public static List<OffHeapMemoryBuffer<Object>> getBuffers()
- {
- return memoryManager.getBuffers();
- }
-
-
- public static OffHeapMemoryBuffer<Object> getActiveBuffer()
- {
- return memoryManager.getActiveBuffer();
- }
-
public static MemoryManagerService<Object> getMemoryManager()
{
return memoryManager;
Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java Thu Mar 1 11:41:34 2012
@@ -19,7 +19,6 @@ package org.apache.directmemory.memory;
* under the License.
*/
-import java.util.List;
public interface MemoryManagerService<V>
{
@@ -79,14 +78,12 @@ public interface MemoryManagerService<V>
long capacity();
+ long used();
+
long collectExpired();
void collectLFU();
- List<OffHeapMemoryBuffer<V>> getBuffers();
-
- OffHeapMemoryBuffer<V> getActiveBuffer();
-
<T extends V> Pointer<V> allocate( Class<T> type, int size, long expiresIn, long expires );
-
+
}
Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java Thu Mar 1 11:41:34 2012
@@ -19,144 +19,352 @@ package org.apache.directmemory.memory;
* under the License.
*/
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.limit;
+import static com.google.common.collect.Ordering.from;
import static java.lang.String.format;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.directmemory.measures.Ram;
+import org.apache.directmemory.memory.allocator.ByteBufferAllocator;
+import org.apache.directmemory.memory.allocator.MergingByteBufferAllocatorImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Predicate;
+
public class MemoryManagerServiceImpl<V>
implements MemoryManagerService<V>
{
+ protected static final long NEVER_EXPIRES = 0L;
+
protected static Logger logger = LoggerFactory.getLogger( MemoryManager.class );
- protected List<OffHeapMemoryBuffer<V>> buffers = new ArrayList<OffHeapMemoryBuffer<V>>();
+ private List<ByteBufferAllocator> allocators;
- protected int activeBufferIndex = 0;
+ private final Set<Pointer<V>> pointers = Collections.newSetFromMap( new ConcurrentHashMap<Pointer<V>, Boolean>() );
+
+ protected int activeAllocatorIndex = 0;
+
+ private final boolean returnNullWhenFull;
+
+ protected final AtomicLong used = new AtomicLong( 0L );
public MemoryManagerServiceImpl()
{
+ this( true );
+ }
+
+ public MemoryManagerServiceImpl( final boolean returnNullWhenFull )
+ {
+ this.returnNullWhenFull = returnNullWhenFull;
}
+ @Override
public void init( int numberOfBuffers, int size )
{
- buffers = new ArrayList<OffHeapMemoryBuffer<V>>( numberOfBuffers );
+ allocators = new ArrayList<ByteBufferAllocator>( numberOfBuffers );
+
for ( int i = 0; i < numberOfBuffers; i++ )
{
- final OffHeapMemoryBuffer<V> offHeapMemoryBuffer = instanciateOffHeapMemoryBuffer( size, i );
- buffers.add( offHeapMemoryBuffer );
+ final ByteBufferAllocator allocator = instanciateByteBufferAllocator( i, size );
+ allocators.add( allocator );
}
logger.info( format( "MemoryManager initialized - %d buffers, %s each", numberOfBuffers, Ram.inMb( size ) ) );
}
- protected OffHeapMemoryBuffer<V> instanciateOffHeapMemoryBuffer( int size, int bufferNumber )
+
+ protected ByteBufferAllocator instanciateByteBufferAllocator( final int allocatorNumber, final int size )
{
- return OffHeapMemoryBufferImpl.createNew( size, bufferNumber );
+ final MergingByteBufferAllocatorImpl allocator = new MergingByteBufferAllocatorImpl( allocatorNumber, size );
+
+ // Hack to ensure the pointers are always splitted as it was the case before.
+ allocator.setMinSizeThreshold( 0.0 );
+ allocator.setSizeRatioThreshold( 1.0 );
+
+ return allocator;
}
- public OffHeapMemoryBuffer<V> getActiveBuffer()
+ protected ByteBufferAllocator getAllocator( int allocatorIndex )
{
- return buffers.get( activeBufferIndex );
+ return allocators.get( allocatorIndex );
}
+ @Override
public Pointer<V> store( byte[] payload, int expiresIn )
{
- Pointer<V> p = getActiveBuffer().store( payload, expiresIn );
- if ( p == null )
+
+ int allocatorIndex = activeAllocatorIndex;
+
+ ByteBuffer buffer = getAllocator( allocatorIndex ).allocate( payload.length );
+
+ if (buffer == null && allocators.size() > 1)
{
- nextBuffer();
- p = getActiveBuffer().store( payload, expiresIn );
+ allocatorIndex = nextAllocator();
+ buffer = getAllocator( allocatorIndex ).allocate( payload.length );
}
+
+ if (buffer == null)
+ {
+ if (returnsNullWhenFull())
+ {
+ return null;
+ }
+ else
+ {
+ throw new BufferOverflowException();
+ }
+ }
+
+ buffer.rewind();
+ buffer.put( payload );
+
+ Pointer<V> p = instanciatePointer( buffer, allocatorIndex, expiresIn, NEVER_EXPIRES );
+
+ used.addAndGet( payload.length );
+
return p;
}
+ @Override
public Pointer<V> store( byte[] payload )
{
return store( payload, 0 );
}
+ @Override
public Pointer<V> update( Pointer<V> pointer, byte[] payload )
{
- return buffers.get( pointer.getBufferNumber() ).update( pointer, payload );
+ free( pointer );
+ return store( payload );
}
- public byte[] retrieve( Pointer<V> pointer )
+ @Override
+ public byte[] retrieve( final Pointer<V> pointer )
{
- return buffers.get( pointer.getBufferNumber() ).retrieve( pointer );
+ // check if pointer has not been freed before
+ if (!pointers.contains( pointer ))
+ {
+ return null;
+ }
+
+ pointer.hit();
+
+ final ByteBuffer buf = pointer.getDirectBuffer().asReadOnlyBuffer();
+ buf.rewind();
+
+ final byte[] swp = new byte[buf.limit()];
+ buf.get( swp );
+ return swp;
}
- public void free( Pointer<V> pointer )
+ @Override
+ public void free( final Pointer<V> pointer )
{
- buffers.get( pointer.getBufferNumber() ).free( pointer );
+ if ( !pointers.remove( pointer ) )
+ {
+ // pointers has been already freed.
+ //throw new IllegalArgumentException( "This pointer " + pointer + " has already been freed" );
+ return;
+ }
+
+ getAllocator( pointer.getBufferNumber() ).free( pointer.getDirectBuffer() );
+
+ used.addAndGet( - pointer.getCapacity() );
+
+ pointer.setFree( true );
}
+ @Override
public void clear()
{
- for ( OffHeapMemoryBuffer<V> buffer : buffers )
+ pointers.clear();
+ for (ByteBufferAllocator allocator : allocators)
{
- buffer.clear();
+ allocator.clear();
}
- activeBufferIndex = 0;
}
+ @Override
public long capacity()
{
long totalCapacity = 0;
- for ( OffHeapMemoryBuffer<V> buffer : buffers )
+ for (ByteBufferAllocator allocator : allocators)
{
- totalCapacity += buffer.capacity();
+ totalCapacity += allocator.getCapacity();
}
return totalCapacity;
}
+
+ @Override
+ public long used()
+ {
+ return used.get();
+ }
- public long collectExpired()
+ private final Predicate<Pointer<V>> relative = new Predicate<Pointer<V>>()
{
- long disposed = 0;
- for ( OffHeapMemoryBuffer<V> buffer : buffers )
+
+ @Override
+ public boolean apply( Pointer<V> input )
{
- disposed += buffer.collectExpired();
+ return !input.isFree() && !input.isExpired();
}
- return disposed;
- }
- public void collectLFU()
+ };
+
+ private final Predicate<Pointer<V>> absolute = new Predicate<Pointer<V>>()
{
- for ( OffHeapMemoryBuffer<V> buf : buffers )
+
+ @Override
+ public boolean apply( Pointer<V> input )
{
- buf.collectLFU( -1 );
+ return !input.isFree() && !input.isExpired();
}
- }
- public List<OffHeapMemoryBuffer<V>> getBuffers()
+ };
+
+ @Override
+ public long collectExpired()
{
- return buffers;
+ int limit = 50;
+ return free( limit( filter( pointers, relative ), limit ) )
+ + free( limit( filter( pointers, absolute ), limit ) );
+
}
- public void setBuffers( List<OffHeapMemoryBuffer<V>> buffers )
+ @Override
+ public void collectLFU()
{
- this.buffers = buffers;
+
+ int limit = pointers.size() / 10;
+
+ Iterable<Pointer<V>> result = from( new Comparator<Pointer<V>>()
+ {
+
+ public int compare( Pointer<V> o1, Pointer<V> o2 )
+ {
+ float f1 = o1.getFrequency();
+ float f2 = o2.getFrequency();
+
+ return Float.compare( f1, f2 );
+ }
+
+ } ).sortedCopy( limit( filter( pointers, new Predicate<Pointer<V>>()
+ {
+
+ @Override
+ public boolean apply( Pointer<V> input )
+ {
+ return !input.isFree();
+ }
+
+ } ), limit ) );
+
+ free( result );
+
}
- public <T extends V> Pointer<V> allocate( Class<T> type, int size, long expiresIn, long expires )
+ protected long free( Iterable<Pointer<V>> pointers )
{
- Pointer<V> p = getActiveBuffer().allocate( type, size, expiresIn, expires );
- if ( p == null )
+ long howMuch = 0;
+ for ( Pointer<V> expired : pointers )
{
- nextBuffer();
- p = getActiveBuffer().allocate( type, size, expiresIn, expires );
+ howMuch += expired.getCapacity();
+ free( expired );
}
- return p;
+ return howMuch;
+ }
+
+
+ protected List<ByteBufferAllocator> getAllocators()
+ {
+ return allocators;
+ }
+
+ @Deprecated
+ public List<OffHeapMemoryBuffer<V>> getBuffers()
+ {
+ return Collections.<OffHeapMemoryBuffer<V>>emptyList();
}
- protected void nextBuffer()
+ @Deprecated
+ @Override
+ public <T extends V> Pointer<V> allocate( final Class<T> type, final int size, final long expiresIn, final long expires )
+ {
+
+ int allocatorIndex = activeAllocatorIndex;
+
+ ByteBuffer buffer = getAllocator( allocatorIndex ).allocate( size );
+
+ if (buffer == null && allocators.size() > 1)
+ {
+ allocatorIndex = nextAllocator();
+ buffer = getAllocator( allocatorIndex ).allocate( size );
+ }
+
+ if (buffer == null)
+ {
+ if (returnsNullWhenFull())
+ {
+ return null;
+ }
+ else
+ {
+ throw new BufferOverflowException();
+ }
+ }
+
+ Pointer<V> pointer = instanciatePointer( buffer, allocatorIndex, expiresIn, expires );
+
+ if (pointer != null)
+ {
+ pointer.setClazz( type );
+ }
+
+ used.addAndGet( size );
+
+ return pointer;
+
+ }
+
+ protected Pointer<V> instanciatePointer( final ByteBuffer buffer, final int allocatorIndex, final long expiresIn, final long expires )
+ {
+
+ Pointer<V> p = new PointerImpl<V>();
+
+ p.setDirectBuffer( buffer );
+ p.setExpiration( expires, expiresIn );
+ p.setBufferNumber( allocatorIndex );
+ p.setFree( false );
+ p.createdNow();
+
+ pointers.add( p );
+
+ return p;
+ }
+
+ protected int nextAllocator()
+ {
+ activeAllocatorIndex = ( activeAllocatorIndex + 1 ) % allocators.size();
+ return activeAllocatorIndex;
+ }
+
+ protected boolean returnsNullWhenFull()
{
- activeBufferIndex = ( activeBufferIndex + 1 ) % buffers.size();
+ return returnNullWhenFull;
}
}
Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceWithAllocationPolicyImpl.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceWithAllocationPolicyImpl.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceWithAllocationPolicyImpl.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceWithAllocationPolicyImpl.java Thu Mar 1 11:41:34 2012
@@ -1,6 +1,9 @@
package org.apache.directmemory.memory;
import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+
+import org.apache.directmemory.memory.allocator.ByteBufferAllocator;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -29,53 +32,51 @@ import java.nio.BufferOverflowException;
public class MemoryManagerServiceWithAllocationPolicyImpl<V>
extends MemoryManagerServiceImpl<V>
{
-
- protected boolean returnNullWhenFull = true;
- protected AllocationPolicy<V> allocationPolicy;
+ protected AllocationPolicy allocationPolicy;
public MemoryManagerServiceWithAllocationPolicyImpl()
{
super();
}
- public MemoryManagerServiceWithAllocationPolicyImpl( final AllocationPolicy<V> allocationPolicy, final boolean returnNullWhenFull )
+ public MemoryManagerServiceWithAllocationPolicyImpl( final AllocationPolicy allocationPolicy, final boolean returnNullWhenFull )
{
- this();
+ super( returnNullWhenFull );
this.setAllocationPolicy( allocationPolicy );
- this.returnNullWhenFull = returnNullWhenFull;
}
@Override
public void init( int numberOfBuffers, int size )
{
super.init( numberOfBuffers, size );
- allocationPolicy.init( getBuffers() );
+ allocationPolicy.init( getAllocators() );
}
- public void setAllocationPolicy( final AllocationPolicy<V> allocationPolicy )
+ public void setAllocationPolicy( final AllocationPolicy allocationPolicy )
{
this.allocationPolicy = allocationPolicy;
}
- @Override
- public OffHeapMemoryBuffer<V> getActiveBuffer()
+
+ protected ByteBufferAllocator getAllocator()
{
- return allocationPolicy.getActiveBuffer( null, 0 );
+ return allocationPolicy.getActiveAllocator( null, 0 );
}
@Override
public Pointer<V> store( byte[] payload, int expiresIn )
{
Pointer<V> p = null;
- OffHeapMemoryBuffer<V> buffer = null;
- int allocationNumber = 1;
+ ByteBufferAllocator allocator = null;
+ int allocationNumber = 0;
do
{
- buffer = allocationPolicy.getActiveBuffer( buffer, allocationNumber );
- if ( buffer == null )
+ allocationNumber++;
+ allocator = allocationPolicy.getActiveAllocator( allocator, allocationNumber );
+ if ( allocator == null )
{
- if (returnNullWhenFull)
+ if (returnsNullWhenFull())
{
return null;
}
@@ -84,8 +85,20 @@ public class MemoryManagerServiceWithAll
throw new BufferOverflowException();
}
}
- p = buffer.store( payload, expiresIn );
- allocationNumber++;
+ final ByteBuffer buffer = allocator.allocate( payload.length );
+
+ if ( buffer == null )
+ {
+ continue;
+ }
+
+ p = instanciatePointer( buffer, allocator.getNumber(), expiresIn, NEVER_EXPIRES );
+
+ buffer.rewind();
+ buffer.put( payload );
+
+ used.addAndGet( payload.length );
+
}
while ( p == null );
return p;
@@ -99,17 +112,18 @@ public class MemoryManagerServiceWithAll
}
@Override
- public <T extends V> Pointer<V> allocate( Class<T> type, int size, long expiresIn, long expires )
+ public <T extends V> Pointer<V> allocate( final Class<T> type, final int size, final long expiresIn, final long expires )
{
Pointer<V> p = null;
- OffHeapMemoryBuffer<V> buffer = null;
- int allocationNumber = 1;
+ ByteBufferAllocator allocator = null;
+ int allocationNumber = 0;
do
{
- buffer = allocationPolicy.getActiveBuffer( buffer, allocationNumber );
- if ( buffer == null )
+ allocationNumber++;
+ allocator = allocationPolicy.getActiveAllocator( allocator, allocationNumber );
+ if ( allocator == null )
{
- if (returnNullWhenFull)
+ if (returnsNullWhenFull())
{
return null;
}
@@ -118,10 +132,22 @@ public class MemoryManagerServiceWithAll
throw new BufferOverflowException();
}
}
- p = buffer.allocate( type, size, expiresIn, expires );
- allocationNumber++;
+
+ final ByteBuffer buffer = allocator.allocate( size );
+
+ if ( buffer == null )
+ {
+ continue;
+ }
+
+ p = instanciatePointer( buffer, allocator.getNumber(), expiresIn, NEVER_EXPIRES );
+
+ used.addAndGet( size );
}
while ( p == null );
+
+ p.setClazz( type );
+
return p;
}
Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBuffer.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBuffer.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBuffer.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBuffer.java Thu Mar 1 11:41:34 2012
@@ -21,6 +21,7 @@ package org.apache.directmemory.memory;
import java.util.Date;
+@Deprecated
public interface OffHeapMemoryBuffer<T>
{
Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBufferImpl.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBufferImpl.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBufferImpl.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBufferImpl.java Thu Mar 1 11:41:34 2012
@@ -33,6 +33,7 @@ import org.apache.directmemory.measures.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@Deprecated
public class OffHeapMemoryBufferImpl<T>
extends AbstractOffHeapMemoryBuffer<T>
{
Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java Thu Mar 1 11:41:34 2012
@@ -74,7 +74,7 @@ public class PointerImpl<T>
@Override
public int getCapacity()
{
- return end - start + 1;
+ return directBuffer == null ? end - start + 1 : directBuffer.limit();
}
@Override
Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/RoundRobinAllocationPolicy.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/RoundRobinAllocationPolicy.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/RoundRobinAllocationPolicy.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/RoundRobinAllocationPolicy.java Thu Mar 1 11:41:34 2012
@@ -22,6 +22,8 @@ package org.apache.directmemory.memory;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.directmemory.memory.allocator.ByteBufferAllocator;
+
/**
* Round Robin allocation policy. An internal counter is incremented (modulo the size of the buffer), so each calls to
* {@link #getActiveBuffer(OffHeapMemoryBuffer, int)} will increment the counter and return the buffer at the index of
@@ -29,15 +31,15 @@ import java.util.concurrent.atomic.Atomi
*
* @author bperroud
*/
-public class RoundRobinAllocationPolicy<T>
- implements AllocationPolicy<T>
+public class RoundRobinAllocationPolicy
+ implements AllocationPolicy
{
// Increment the counter and get the value. Need to start at -1 to have 0'index at first call.
private static final int BUFFERS_INDEX_INITIAL_VALUE = -1;
// All the buffers to allocate
- private List<OffHeapMemoryBuffer<T>> buffers;
+ private List<ByteBufferAllocator> allocators;
// Cyclic counter
private AtomicInteger buffersIndexCounter = new AtomicInteger( BUFFERS_INDEX_INITIAL_VALUE );
@@ -48,19 +50,19 @@ public class RoundRobinAllocationPolicy<
// Current max number of allocations
private int maxAllocations = DEFAULT_MAX_ALLOCATIONS;
- public void setMaxAllocations( int maxAllocations )
+ public void setMaxAllocations( final int maxAllocations )
{
this.maxAllocations = maxAllocations;
}
@Override
- public void init( List<OffHeapMemoryBuffer<T>> buffers )
+ public void init( final List<ByteBufferAllocator> allocators )
{
- this.buffers = buffers;
+ this.allocators = allocators;
}
@Override
- public OffHeapMemoryBuffer<T> getActiveBuffer( OffHeapMemoryBuffer<T> previouslyAllocatedBuffer, int allocationNumber )
+ public ByteBufferAllocator getActiveAllocator( final ByteBufferAllocator previousAllocator, final int allocationNumber )
{
// If current allocation is more than the limit, return a null buffer.
if ( allocationNumber > maxAllocations )
@@ -71,9 +73,9 @@ public class RoundRobinAllocationPolicy<
// Thread safely increment and get the next buffer's index
int i = incrementAndGetBufferIndex();
- final OffHeapMemoryBuffer<T> buffer = buffers.get( i );
+ final ByteBufferAllocator allocator = allocators.get( i );
- return buffer;
+ return allocator;
}
@Override
@@ -95,7 +97,7 @@ public class RoundRobinAllocationPolicy<
do
{
int currentIndex = buffersIndexCounter.get();
- newIndex = ( currentIndex + 1 ) % buffers.size();
+ newIndex = ( currentIndex + 1 ) % allocators.size();
updateOk = buffersIndexCounter.compareAndSet( currentIndex, newIndex );
}
while ( !updateOk );
Added: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/AbstractByteBufferAllocator.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/AbstractByteBufferAllocator.java?rev=1295522&view=auto
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/AbstractByteBufferAllocator.java (added)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/AbstractByteBufferAllocator.java Thu Mar 1 11:41:34 2012
@@ -0,0 +1,39 @@
+package org.apache.directmemory.memory.allocator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public abstract class AbstractByteBufferAllocator
+ implements ByteBufferAllocator
+{
+
+ private final int number;
+
+ AbstractByteBufferAllocator( final int number )
+ {
+ this.number = number;
+ }
+
+ @Override
+ public int getNumber()
+ {
+ return number;
+ }
+
+}
Added: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/ByteBufferAllocator.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/ByteBufferAllocator.java?rev=1295522&view=auto
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/ByteBufferAllocator.java (added)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/ByteBufferAllocator.java Thu Mar 1 11:41:34 2012
@@ -0,0 +1,53 @@
+package org.apache.directmemory.memory.allocator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ *
+ *
+ */
+public interface ByteBufferAllocator
+ extends Closeable
+{
+
+ /**
+ * Return the given ByteBuffer making it available for a future usage
+ * @param buffer
+ */
+ void free( final ByteBuffer buffer );
+
+ /**
+ * Allocate the given size off heap, or return null if the allocation failed.
+ * @param size
+ * @return
+ */
+ ByteBuffer allocate( final int size );
+
+ void clear();
+
+ int getCapacity();
+
+ int getNumber();
+
+}
Added: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/DirectByteBufferUtils.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/DirectByteBufferUtils.java?rev=1295522&view=auto
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/DirectByteBufferUtils.java (added)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/DirectByteBufferUtils.java Thu Mar 1 11:41:34 2012
@@ -0,0 +1,28 @@
+package org.apache.directmemory.memory.allocator;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class DirectByteBufferUtils
+{
+
+ public static void destroyDirectByteBuffer( final ByteBuffer buffer )
+ throws IllegalArgumentException, IllegalAccessException, InvocationTargetException, SecurityException,
+ NoSuchMethodException
+ {
+
+ checkArgument( buffer.isDirect(), "toBeDestroyed isn't direct!" );
+
+ Method cleanerMethod = buffer.getClass().getMethod( "cleaner" );
+ cleanerMethod.setAccessible( true );
+ Object cleaner = cleanerMethod.invoke( buffer );
+ Method cleanMethod = cleaner.getClass().getMethod( "clean" );
+ cleanMethod.setAccessible( true );
+ cleanMethod.invoke( cleaner );
+
+ }
+
+}
Added: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeByteBufferAllocatorImpl.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeByteBufferAllocatorImpl.java?rev=1295522&view=auto
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeByteBufferAllocatorImpl.java (added)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeByteBufferAllocatorImpl.java Thu Mar 1 11:41:34 2012
@@ -0,0 +1,215 @@
+package org.apache.directmemory.memory.allocator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * {@link ByteBufferAllocator} implementation that instantiate {@link ByteBuffer}s of fixed size, called slices.
+ *
+ * @author bperroud
+ *
+ */
+public class FixedSizeByteBufferAllocatorImpl
+ extends AbstractByteBufferAllocator
+{
+
+ protected static Logger logger = LoggerFactory.getLogger( FixedSizeByteBufferAllocatorImpl.class );
+
+ // Collection that keeps track of the parent buffers (segments) where slices are allocated
+ private final Set<ByteBuffer> segmentsBuffers = new HashSet<ByteBuffer>();
+
+ // Collection that owns all slice that can be used.
+ private final Queue<ByteBuffer> freeBuffers = new ConcurrentLinkedQueue<ByteBuffer>();
+
+ // Size of each slices dividing each segments of the slab
+ private final int sliceSize;
+
+ // Total size of the current slab
+ private int totalSize;
+
+ // Tells if one need to keep track of borrowed buffers
+ private boolean keepTrackOfUsedSliceBuffers = false;
+
+ // Tells if it returns null or throw an BufferOverflowExcpetion with the requested size is bigger than the size of the slices
+ private boolean returnNullWhenOversizingSliceSize = true;
+
+ // Tells if it returns null when no buffers are available
+ private boolean returnNullWhenNoBufferAvailable = true;
+
+ // Collection that keeps track of borrowed buffers
+ private final Set<ByteBuffer> usedSliceBuffers = Collections
+ .newSetFromMap( new ConcurrentHashMap<ByteBuffer, Boolean>() );
+
+ protected Logger getLogger()
+ {
+ return logger;
+ }
+
+ /**
+ * Constructor.
+ * @param totalSize : the internal buffer
+ * @param sliceSize : arbitrary number of the buffer.
+ * @param numberOfSegments :
+ */
+ FixedSizeByteBufferAllocatorImpl( final int number, final int totalSize, final int sliceSize, final int numberOfSegments )
+ {
+ super( number );
+
+ this.totalSize = totalSize;
+ this.sliceSize = sliceSize;
+
+ init( numberOfSegments );
+
+ }
+
+ protected void init( final int numberOfSegments )
+ {
+ // Compute the size of each segments
+ int segmentSize = totalSize / numberOfSegments;
+ // size is rounded down to a multiple of the slice size
+ segmentSize -= segmentSize % sliceSize;
+
+ for ( int i = 0; i < numberOfSegments; i++ )
+ {
+ final ByteBuffer segment = ByteBuffer.allocateDirect( segmentSize );
+ segmentsBuffers.add( segment );
+
+ for ( int j = 0; j < segment.capacity(); j += sliceSize )
+ {
+ segment.clear();
+ segment.position( j );
+ segment.limit( j + sliceSize );
+ final ByteBuffer slice = segment.slice();
+ freeBuffers.add( slice );
+ }
+ }
+ }
+
+
+ protected ByteBuffer findFreeBuffer( int capacity )
+ {
+ if ( capacity > sliceSize )
+ {
+ if (returnNullWhenOversizingSliceSize)
+ {
+ return null;
+ }
+ else
+ {
+ throw new BufferOverflowException();
+ }
+ }
+ // TODO : Add capacity to wait till a given timeout for a freed buffer
+ return freeBuffers.poll();
+ }
+
+ @Override
+ public void free( final ByteBuffer byteBuffer )
+ {
+
+ if ( keepTrackOfUsedSliceBuffers && !usedSliceBuffers.remove( byteBuffer ) )
+ {
+ return;
+ }
+
+ Preconditions.checkArgument( byteBuffer.capacity() == sliceSize );
+
+ freeBuffers.offer( byteBuffer );
+
+ }
+
+ @Override
+ public ByteBuffer allocate( int size )
+ {
+
+ ByteBuffer allocatedByteBuffer = findFreeBuffer( size );
+
+ if ( allocatedByteBuffer == null )
+ {
+ if (returnNullWhenNoBufferAvailable)
+ {
+ return null;
+ }
+ else
+ {
+ throw new BufferOverflowException();
+ }
+ }
+
+ allocatedByteBuffer.clear();
+ allocatedByteBuffer.limit( size );
+
+ if ( keepTrackOfUsedSliceBuffers )
+ {
+ usedSliceBuffers.add( allocatedByteBuffer );
+ }
+
+ return allocatedByteBuffer;
+
+ }
+
+ public int getSliceSize()
+ {
+ return sliceSize;
+ }
+
+ @Override
+ public void clear()
+ {
+ // Nothing to do.
+ }
+
+ @Override
+ public int getCapacity()
+ {
+ return totalSize;
+ }
+
+ @Override
+ public void close()
+ {
+ clear();
+
+ for ( final ByteBuffer buffer : segmentsBuffers )
+ {
+ try
+ {
+ DirectByteBufferUtils.destroyDirectByteBuffer( buffer );
+ }
+ catch (Exception e)
+ {
+
+ }
+ }
+ }
+}
Added: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/MergingByteBufferAllocatorImpl.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/MergingByteBufferAllocatorImpl.java?rev=1295522&view=auto
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/MergingByteBufferAllocatorImpl.java (added)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/MergingByteBufferAllocatorImpl.java Thu Mar 1 11:41:34 2012
@@ -0,0 +1,421 @@
+package org.apache.directmemory.memory.allocator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * {@link ByteBufferAllocator} implementation with {@link ByteBuffer} merging capabilities
+ *
+ * {@link TreeMap} can be safely used because synchronization is achieved through a {@link Lock}
+ *
+ * @author bperroud
+ *
+ */
+public class MergingByteBufferAllocatorImpl
+ extends AbstractByteBufferAllocator
+{
+
+ private static final double DEFAULT_SIZE_RATIO_THRESHOLD = 0.9;
+
+ private static final int DEFAULT_MIN_SIZE_THRESHOLD = 128;
+
+ protected static Logger logger = LoggerFactory.getLogger( MergingByteBufferAllocatorImpl.class );
+
+ // List of free pointers, with several list of different size
+ private final NavigableMap<Integer, Collection<LinkedByteBuffer>> freePointers = new ConcurrentSkipListMap<Integer, Collection<LinkedByteBuffer>>();
+
+ // Set of used pointers. The key is a hash of ByteBuffer.
+ private final Map<Integer, LinkedByteBuffer> usedPointers = new ConcurrentHashMap<Integer, LinkedByteBuffer>();
+
+ // Lock used instead of synchronized block to guarantee consistency when manipulating list of pointers.
+ private final Lock pointerManipulationLock = new ReentrantLock();
+
+ private final ByteBuffer parentBuffer;
+
+ // Allowed size ratio of the returned pointer before splitting the pointer
+ private double sizeRatioThreshold = DEFAULT_SIZE_RATIO_THRESHOLD;
+
+ //
+ private double minSizeThreshold = DEFAULT_MIN_SIZE_THRESHOLD;
+
+ private boolean returnNullWhenBufferIsFull = true;
+
+ protected Logger getLogger()
+ {
+ return logger;
+ }
+
+ /**
+ * Constructor.
+ * @param buffer : the internal buffer
+ * @param bufferNumber : arbitrary number of the buffer.
+ */
+ public MergingByteBufferAllocatorImpl( final int number, final int totalSize )
+ {
+ super( number );
+
+ parentBuffer = ByteBuffer.allocateDirect( totalSize );
+ init();
+ }
+
+ /**
+ * Initialization function. Create an initial free {@link Pointer} mapping the whole buffer.
+ */
+ protected void init()
+ {
+ Integer totalSize = Integer.valueOf( parentBuffer.capacity() );
+
+ for ( Integer i : generateQueueSizes( totalSize ) )
+ {
+ freePointers.put( Integer.valueOf( i ), new LinkedHashSet<LinkedByteBuffer>() );
+ }
+
+ initFirstBuffer();
+
+ }
+
+ private void initFirstBuffer()
+ {
+ parentBuffer.clear();
+ final ByteBuffer initialBuffer = parentBuffer.slice();
+ final LinkedByteBuffer initialLinkedBuffer = new LinkedByteBuffer( 0, initialBuffer, null, null );
+
+ insertLinkedBuffer( initialLinkedBuffer );
+ }
+
+
+ protected List<Integer> generateQueueSizes( final Integer totalSize )
+ {
+ List<Integer> sizes = new ArrayList<Integer>();
+
+ for ( int i = 128; i <= totalSize; i *= 8 )
+ {
+ sizes.add( Integer.valueOf( i ) );
+ }
+
+ // If totalSize < 128 or totalSize is not a multiple of 128
+ // we force adding an element to the map
+ if ( sizes.isEmpty() || !sizes.contains( totalSize ) )
+ {
+ sizes.add( totalSize );
+ }
+
+ return sizes;
+ }
+
+ @Override
+ public void free( final ByteBuffer buffer )
+ {
+
+ LinkedByteBuffer returningLinkedBuffer = usedPointers.remove( getHash( buffer ) );
+
+ if ( returningLinkedBuffer == null )
+ {
+ // Hu ? returned twice ? Not returned at the right place ?
+ throw new IllegalArgumentException( "The buffer " + buffer + " seems not to belong to this allocator" );
+ }
+
+ try
+ {
+ pointerManipulationLock.lock();
+
+ if ( returningLinkedBuffer.getBefore() != null )
+ {
+ // if returningLinkedBuffer.getBefore is in the free list, it is free, then it's free and can be merged
+ if (getFreeLinkedByteBufferCollection( returningLinkedBuffer.getBefore() ).contains( returningLinkedBuffer.getBefore() ) )
+ {
+ returningLinkedBuffer = mergePointer( returningLinkedBuffer.getBefore(), returningLinkedBuffer );
+ }
+ }
+
+ if ( returningLinkedBuffer.getAfter() != null )
+ {
+ // if returningLinkedBuffer.getAfter is in the free list, it is free, it is free, then it's free and can be merged
+ if (getFreeLinkedByteBufferCollection( returningLinkedBuffer.getAfter() ).contains( returningLinkedBuffer.getAfter() ) )
+ {
+ returningLinkedBuffer = mergePointer( returningLinkedBuffer, returningLinkedBuffer.getAfter() );
+ }
+ }
+
+ insertLinkedBuffer( returningLinkedBuffer );
+ }
+ finally
+ {
+ pointerManipulationLock.unlock();
+ }
+ }
+
+ @Override
+ public ByteBuffer allocate( final int size )
+ {
+
+ try
+ {
+ pointerManipulationLock.lock();
+
+ final SortedMap<Integer, Collection<LinkedByteBuffer>> freeMap = freePointers
+ .tailMap( size - 1 );
+ for ( final Map.Entry<Integer, Collection<LinkedByteBuffer>> bufferQueueEntry : freeMap.entrySet() )
+ {
+
+ Iterator<LinkedByteBuffer> linkedByteBufferIterator = bufferQueueEntry.getValue().iterator();
+
+ while ( linkedByteBufferIterator.hasNext() )
+ {
+ LinkedByteBuffer linkedBuffer = linkedByteBufferIterator.next();
+
+ if ( linkedBuffer.getBuffer().capacity() >= size )
+ {
+ // Remove this element from the collection
+ linkedByteBufferIterator.remove();
+
+ LinkedByteBuffer returnedLinkedBuffer = linkedBuffer;
+
+ // Check if splitting need to be performed
+ if ( linkedBuffer.getBuffer().capacity() > minSizeThreshold
+ && ( 1.0 * size / linkedBuffer.getBuffer().capacity() ) < sizeRatioThreshold )
+ {
+ // Split the buffer in a buffer that will be returned and another buffer reinserted in the corresponding queue.
+ parentBuffer.clear();
+ parentBuffer.position( linkedBuffer.getOffset() );
+ parentBuffer.limit( linkedBuffer.getOffset() + size );
+ final ByteBuffer newBuffer = parentBuffer.slice();
+
+ returnedLinkedBuffer = new LinkedByteBuffer( linkedBuffer.getOffset(), newBuffer,
+ linkedBuffer.getBefore(), null );
+
+ if (linkedBuffer.getBefore() != null)
+ {
+ linkedBuffer.getBefore().setAfter( returnedLinkedBuffer );
+ }
+
+ parentBuffer.clear();
+ parentBuffer.position( linkedBuffer.getOffset() + size );
+ parentBuffer.limit( linkedBuffer.getOffset() + linkedBuffer.getBuffer().capacity() );
+ final ByteBuffer remainingBuffer = parentBuffer.slice();
+ final LinkedByteBuffer remainingLinkedBuffer = new LinkedByteBuffer(
+ linkedBuffer.getOffset() + size, remainingBuffer,
+ returnedLinkedBuffer, linkedBuffer.getAfter() );
+
+ if (linkedBuffer.getAfter() != null)
+ {
+ linkedBuffer.getAfter().setBefore( remainingLinkedBuffer );
+ }
+
+ returnedLinkedBuffer.setAfter( remainingLinkedBuffer );
+
+ insertLinkedBuffer( remainingLinkedBuffer );
+
+ }
+ else
+ {
+ // If the buffer is not split, set the limit accordingly
+ returnedLinkedBuffer.getBuffer().clear();
+ returnedLinkedBuffer.getBuffer().limit( size );
+ }
+
+ usedPointers.put( getHash( returnedLinkedBuffer.getBuffer() ), returnedLinkedBuffer );
+
+ return returnedLinkedBuffer.getBuffer();
+ }
+
+ }
+ }
+
+ if (returnNullWhenBufferIsFull)
+ {
+ return null;
+ }
+ else
+ {
+ throw new BufferOverflowException();
+ }
+
+ }
+ finally
+ {
+ pointerManipulationLock.unlock();
+ }
+ }
+
+ @Override
+ public void clear()
+ {
+ usedPointers.clear();
+
+ for ( final Map.Entry<Integer, Collection<LinkedByteBuffer>> bufferQueueEntry : freePointers.entrySet() )
+ {
+ bufferQueueEntry.getValue().clear();
+ }
+
+ initFirstBuffer();
+ }
+
+ private void insertLinkedBuffer( final LinkedByteBuffer linkedBuffer )
+ {
+ getFreeLinkedByteBufferCollection( linkedBuffer ).add( linkedBuffer );
+ }
+
+ private Collection<LinkedByteBuffer> getFreeLinkedByteBufferCollection( final LinkedByteBuffer linkedBuffer )
+ {
+ final Integer size = Integer.valueOf( linkedBuffer.getBuffer().capacity() - 1 );
+ final Map.Entry<Integer, Collection<LinkedByteBuffer>> bufferCollectionEntry = freePointers.ceilingEntry( size );
+ return bufferCollectionEntry.getValue();
+ }
+
+ private LinkedByteBuffer mergePointer( final LinkedByteBuffer first, final LinkedByteBuffer next )
+ {
+ parentBuffer.clear();
+ parentBuffer.position( first.getOffset() );
+ parentBuffer.limit( first.getOffset() + first.getBuffer().capacity() + next.getBuffer().capacity() );
+ final ByteBuffer newByteBuffer = parentBuffer.slice();
+
+ final LinkedByteBuffer newLinkedByteBuffer = new LinkedByteBuffer( first.getOffset(), newByteBuffer,
+ first.getBefore(), next.getAfter() );
+
+ if ( first.getBefore() != null )
+ {
+ first.getBefore().setAfter( newLinkedByteBuffer );
+ }
+
+ if ( next.getAfter() != null )
+ {
+ next.getAfter().setBefore( newLinkedByteBuffer );
+ }
+
+ // Remove the two pointers from their corresponding free lists.
+ getFreeLinkedByteBufferCollection( first ).remove( first );
+ getFreeLinkedByteBufferCollection( next ).remove( next );
+
+ return newLinkedByteBuffer;
+ }
+
+ private static Integer getHash( final ByteBuffer buffer )
+ {
+ return Integer.valueOf( System.identityHashCode( buffer ) );
+ }
+
+ public void setSizeRatioThreshold( final double sizeRatioThreshold )
+ {
+ this.sizeRatioThreshold = sizeRatioThreshold;
+ }
+
+ public void setMinSizeThreshold( final double minSizeThreshold )
+ {
+ this.minSizeThreshold = minSizeThreshold;
+ }
+
+ public void setReturnNullWhenBufferIsFull( boolean returnNullWhenBufferIsFull )
+ {
+ this.returnNullWhenBufferIsFull = returnNullWhenBufferIsFull;
+ }
+
+ @Override
+ public int getCapacity()
+ {
+ return parentBuffer.capacity();
+ }
+
+ private static class LinkedByteBuffer
+ {
+ private final int offset;
+
+ private final ByteBuffer buffer;
+
+ private volatile LinkedByteBuffer before;
+
+ private volatile LinkedByteBuffer after;
+
+ public LinkedByteBuffer( final int offset, final ByteBuffer buffer, final LinkedByteBuffer before,
+ final LinkedByteBuffer after )
+ {
+ this.offset = offset;
+ this.buffer = buffer;
+ setBefore( before );
+ setAfter( after );
+ }
+
+ public ByteBuffer getBuffer()
+ {
+ return buffer;
+ }
+
+ public int getOffset()
+ {
+ return offset;
+ }
+
+ public LinkedByteBuffer getBefore()
+ {
+ return before;
+ }
+
+ public void setBefore( final LinkedByteBuffer before )
+ {
+ this.before = before;
+ }
+
+ public LinkedByteBuffer getAfter()
+ {
+ return after;
+ }
+
+ public void setAfter( final LinkedByteBuffer after )
+ {
+ this.after = after;
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException
+ {
+ clear();
+
+ try {
+ DirectByteBufferUtils.destroyDirectByteBuffer( parentBuffer );
+ }
+ catch (Exception e)
+ {
+
+ }
+ }
+
+
+}
Added: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/SlabByteBufferAllocatorImpl.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/SlabByteBufferAllocatorImpl.java?rev=1295522&view=auto
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/SlabByteBufferAllocatorImpl.java (added)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/SlabByteBufferAllocatorImpl.java Thu Mar 1 11:41:34 2012
@@ -0,0 +1,215 @@
+package org.apache.directmemory.memory.allocator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * {@link ByteBufferAllocator} implementation that instantiate uses {@link FixedSizeByteBufferAllocatorImpl} of different size to allocate best matching's size {@link ByteBuffer}
+ *
+ * @author bperroud
+ *
+ */
+public class SlabByteBufferAllocatorImpl
+ extends AbstractByteBufferAllocator
+{
+
+ protected static Logger logger = LoggerFactory.getLogger( SlabByteBufferAllocatorImpl.class );
+
+ // Tells if it returns null when no buffers are available
+ private boolean returnNullWhenNoBufferAvailable = true;
+
+ // Internal slabs sorted by sliceSize
+ private final NavigableMap<Integer, FixedSizeByteBufferAllocatorImpl> slabs = new ConcurrentSkipListMap<Integer, FixedSizeByteBufferAllocatorImpl>();
+
+ private final boolean allowAllocationToBiggerSlab;
+
+ protected Logger getLogger()
+ {
+ return logger;
+ }
+
+ /**
+ * Constructor.
+ * @param totalSize : the internal buffer
+ * @param sliceSize : arbitrary number of the buffer.
+ * @param numberOfSegments :
+ */
+ SlabByteBufferAllocatorImpl( final int number, final Collection<FixedSizeByteBufferAllocatorImpl> slabs, final boolean allowAllocationToBiggerSlab )
+ {
+ super( number );
+
+ this.allowAllocationToBiggerSlab = allowAllocationToBiggerSlab;
+
+ for (FixedSizeByteBufferAllocatorImpl slab : slabs)
+ {
+ this.slabs.put( slab.getSliceSize(), slab );
+ }
+
+ }
+
+
+ private FixedSizeByteBufferAllocatorImpl getSlabThatMatchTheSize( final int size )
+ {
+ // Find the slab that can carry the wanted size. -1 is used because higherEntry returns a strictly higher entry.
+ final Map.Entry<Integer, FixedSizeByteBufferAllocatorImpl> entry = slabs.higherEntry( size - 1 );
+
+ if ( entry != null )
+ {
+ return entry.getValue();
+ }
+
+ // If an entry has not been found, this means that no slabs has bigger enough slices to allocate the given size
+ return null;
+ }
+
+ @Override
+ public void free( final ByteBuffer byteBuffer )
+ {
+
+ final FixedSizeByteBufferAllocatorImpl slab = getSlabThatMatchTheSize( byteBuffer.capacity() );
+
+ if (slab == null)
+ {
+ // Hu ? where this bytebuffer come from ??
+ return;
+ }
+
+ slab.free( byteBuffer );
+
+ }
+
+ @Override
+ public ByteBuffer allocate( final int size )
+ {
+
+
+ final FixedSizeByteBufferAllocatorImpl slab = getSlabThatMatchTheSize( size );
+
+ if ( slab == null )
+ {
+ // unable to store such big objects
+ if (returnNullWhenNoBufferAvailable)
+ {
+ return null;
+ }
+ else
+ {
+ throw new BufferOverflowException();
+ }
+ }
+
+ // Try to allocate the given size
+ final ByteBuffer byteBuffer = slab.allocate( size );
+
+ // If allocation succeed, return the buffer
+ if (byteBuffer != null)
+ {
+ return byteBuffer;
+ }
+
+ // Otherwise we have the option to allow in a bigger slab.
+ if (!allowAllocationToBiggerSlab)
+ {
+ if (returnNullWhenNoBufferAvailable)
+ {
+ return null;
+ }
+ else
+ {
+ throw new BufferOverflowException();
+ }
+ }
+ else
+ {
+ // We can try to allocate to a bigger slab.
+ // size + 1 here because getSlabThatMatchTheSize do a size -1 and thus will return the same slab
+ final int biggerSize = slab.getSliceSize() + 1;
+ final FixedSizeByteBufferAllocatorImpl biggerSlab = getSlabThatMatchTheSize( biggerSize );
+ if (biggerSlab == null)
+ {
+ // We were already trying to allocate in the biggest slab
+ if (returnNullWhenNoBufferAvailable)
+ {
+ return null;
+ }
+ else
+ {
+ throw new BufferOverflowException();
+ }
+ }
+
+ final ByteBuffer secondByteBuffer = biggerSlab.allocate( size );
+
+ if (secondByteBuffer == null)
+ {
+ if (returnNullWhenNoBufferAvailable)
+ {
+ return null;
+ }
+ else
+ {
+ throw new BufferOverflowException();
+ }
+ }
+ else
+ {
+ return secondByteBuffer;
+ }
+ }
+ }
+
+ @Override
+ public void clear()
+ {
+ // Nothing to do.
+ }
+
+ @Override
+ public int getCapacity()
+ {
+ int totalSize = 0;
+ for (final Map.Entry<Integer, FixedSizeByteBufferAllocatorImpl> entry : slabs.entrySet())
+ {
+ totalSize += entry.getValue().getCapacity();
+ }
+ return totalSize;
+ }
+
+ @Override
+ public void close()
+ throws IOException
+ {
+ for (final Map.Entry<Integer, FixedSizeByteBufferAllocatorImpl> entry : slabs.entrySet())
+ {
+ entry.getValue().close();
+ }
+ }
+}
Modified: incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/cache/CacheServiceImplTest.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/cache/CacheServiceImplTest.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/cache/CacheServiceImplTest.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/cache/CacheServiceImplTest.java Thu Mar 1 11:41:34 2012
@@ -36,7 +36,7 @@ public class CacheServiceImplTest
@Test
public void testOffHeapExceedMemoryReturnNullWhenTrue()
{
- AllocationPolicy<byte[]> allocationPolicy = new RoundRobinAllocationPolicy<byte[]>();
+ AllocationPolicy allocationPolicy = new RoundRobinAllocationPolicy();
MemoryManagerService<byte[]> memoryManager =
new MemoryManagerServiceWithAllocationPolicyImpl<byte[]>( allocationPolicy, true );
CacheService<Integer, byte[]> cache = new DirectMemory<Integer, byte[]>()
Modified: incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/AbstractOffHeapMemoryBufferTest.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/AbstractOffHeapMemoryBufferTest.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/AbstractOffHeapMemoryBufferTest.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/AbstractOffHeapMemoryBufferTest.java Thu Mar 1 11:41:34 2012
@@ -27,7 +27,6 @@ import java.util.Random;
import junit.framework.Assert;
import org.apache.directmemory.memory.MemoryManagerService;
-import org.apache.directmemory.memory.OffHeapMergingMemoryBufferImpl;
import org.apache.directmemory.memory.OffHeapMemoryBuffer;
import org.apache.directmemory.memory.Pointer;
import org.junit.Test;
@@ -314,43 +313,6 @@ public abstract class AbstractOffHeapMem
}
@Test
- public void testRandomPayload2()
- {
-
- final int NUMBER_OF_OBJECTS = 10;
- final int BUFFER_SIZE = NUMBER_OF_OBJECTS * SMALL_PAYLOAD_LENGTH;
-
- final OffHeapMergingMemoryBufferImpl<Object> offHeapLinkedMemoryBuffer = OffHeapMergingMemoryBufferImpl
- .createNew( BUFFER_SIZE );
-
- byte[] payload1 = MemoryTestUtils.generateRandomPayload( 2 * SMALL_PAYLOAD_LENGTH );
- Pointer<Object> pointer1 = offHeapLinkedMemoryBuffer.store( payload1 );
- Assert.assertNotNull( pointer1 );
-
- byte[] fetchedPayload1 = offHeapLinkedMemoryBuffer.retrieve( pointer1 );
- Assert.assertEquals( new String( payload1 ), new String( fetchedPayload1 ) );
-
- byte[] payload2 = MemoryTestUtils.generateRandomPayload( SMALL_PAYLOAD_LENGTH );
- Pointer<Object> pointer2 = offHeapLinkedMemoryBuffer.store( payload2 );
- Assert.assertNotNull( pointer2 );
-
- byte[] fetchedPayload2 = offHeapLinkedMemoryBuffer.retrieve( pointer2 );
- Assert.assertEquals( new String( payload2 ), new String( fetchedPayload2 ) );
-
- offHeapLinkedMemoryBuffer.free( pointer1 );
-
- offHeapLinkedMemoryBuffer.free( pointer1 );
-
- byte[] payload3 = MemoryTestUtils.generateRandomPayload( SMALL_PAYLOAD_LENGTH );
- Pointer<Object> pointer3 = offHeapLinkedMemoryBuffer.store( payload3 );
- Assert.assertNotNull( pointer3 );
-
- byte[] fetchedPayload3 = offHeapLinkedMemoryBuffer.retrieve( pointer3 );
- Assert.assertEquals( new String( payload3 ), new String( fetchedPayload3 ) );
-
- }
-
- @Test
public void testUpdate()
{
Modified: incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent2Test.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent2Test.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent2Test.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent2Test.java Thu Mar 1 11:41:34 2012
@@ -28,7 +28,6 @@ import com.carrotsearch.junitbenchmarks.
import com.google.common.collect.MapMaker;
import org.apache.directmemory.measures.Ram;
import org.apache.directmemory.memory.MemoryManager;
-import org.apache.directmemory.memory.OffHeapMemoryBuffer;
import org.apache.directmemory.memory.Pointer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -204,11 +203,10 @@ public class Concurrent2Test
private static Logger logger = LoggerFactory.getLogger( Concurrent2Test.class );
- private static void dump( OffHeapMemoryBuffer<Object> mem )
+ private static void dump( MemoryManagerService<Object> mms )
{
- logger.info( "off-heap - buffer: " + mem.getBufferNumber() );
- logger.info( "off-heap - allocated: " + Ram.inMb( mem.capacity() ) );
- logger.info( "off-heap - used: " + Ram.inMb( mem.used() ) );
+ logger.info( "off-heap - allocated: " + Ram.inMb( mms.capacity() ) );
+ logger.info( "off-heap - used: " + Ram.inMb( mms.used() ) );
logger.info( "heap - max: " + Ram.inMb( Runtime.getRuntime().maxMemory() ) );
logger.info( "heap - allocated: " + Ram.inMb( Runtime.getRuntime().totalMemory() ) );
logger.info( "heap - free : " + Ram.inMb( Runtime.getRuntime().freeMemory() ) );
@@ -225,10 +223,7 @@ public class Concurrent2Test
public static void dump()
{
- for ( OffHeapMemoryBuffer<Object> mem : MemoryManager.getBuffers() )
- {
- dump( mem );
- }
+ dump( MemoryManager.getMemoryManager() );
logger.info( "************************************************" );
logger.info( "entries: " + entries );
Modified: incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent3Test.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent3Test.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent3Test.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent3Test.java Thu Mar 1 11:41:34 2012
@@ -27,7 +27,6 @@ import com.carrotsearch.junitbenchmarks.
import com.google.common.collect.MapMaker;
import org.apache.directmemory.measures.Ram;
import org.apache.directmemory.memory.MemoryManager;
-import org.apache.directmemory.memory.OffHeapMemoryBuffer;
import org.apache.directmemory.memory.Pointer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -226,12 +225,11 @@ public class Concurrent3Test
private static Logger logger = LoggerFactory.getLogger( Concurrent3Test.class );
- private static void dump( OffHeapMemoryBuffer<Object> mem )
+ private static void dump( MemoryManagerService<Object> mms )
{
- logger.info( "off-heap - buffer: " + mem.getBufferNumber() );
- logger.info( "off-heap - allocated: " + Ram.inMb( mem.capacity() ) );
- logger.info( "off-heap - used: " + Ram.inMb( mem.used() ) );
- logger.info( "heap - max: " + Ram.inMb( Runtime.getRuntime().maxMemory() ) );
+ logger.info( "off-heap - allocated: " + Ram.inMb( mms.capacity() ) );
+ logger.info( "off-heap - used: " + Ram.inMb( mms.used() ) );
+ logger.info( "heap - max: " + Ram.inMb( Runtime.getRuntime().maxMemory() ) );
logger.info( "heap - allocated: " + Ram.inMb( Runtime.getRuntime().totalMemory() ) );
logger.info( "heap - free : " + Ram.inMb( Runtime.getRuntime().freeMemory() ) );
logger.info( "************************************************" );
@@ -247,10 +245,7 @@ public class Concurrent3Test
public static void dump()
{
- for ( OffHeapMemoryBuffer<Object> mem : MemoryManager.getBuffers() )
- {
- dump( mem );
- }
+ dump( MemoryManager.getMemoryManager() );
logger.info( "************************************************" );
logger.info( "entries: " + entries );
Modified: incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplTest.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplTest.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplTest.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplTest.java Thu Mar 1 11:41:34 2012
@@ -130,7 +130,7 @@ public class MemoryManagerServiceImplTes
}
// Buffer is fully used.
- Assert.assertEquals( BUFFER_SIZE, memoryManagerService.getBuffers().get( 0 ).used() );
+ Assert.assertEquals( BUFFER_SIZE, memoryManagerService.used() );
Assert.assertNotNull( lastPointer );
memoryManagerService.free( lastPointer );
@@ -139,7 +139,7 @@ public class MemoryManagerServiceImplTes
Assert.assertNotNull( pointerNotNull );
// Buffer again fully used.
- Assert.assertEquals( BUFFER_SIZE, memoryManagerService.getBuffers().get( 0 ).used() );
+ Assert.assertEquals( BUFFER_SIZE, memoryManagerService.used() );
}
Modified: incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerTest.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerTest.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerTest.java Thu Mar 1 11:41:34 2012
@@ -60,9 +60,9 @@ public class MemoryManagerTest
assertNotNull( p );
//assertEquals(size,p.end);
assertEquals( size, p.getCapacity() );
- assertEquals( size, MemoryManager.getActiveBuffer().used() );
+ assertEquals( size, MemoryManager.getMemoryManager().used() );
MemoryManager.free( p );
- assertEquals( 0, MemoryManager.getActiveBuffer().used() );
+ assertEquals( 0, MemoryManager.getMemoryManager().used() );
logger.info( "end" );
}
@@ -89,7 +89,7 @@ public class MemoryManagerTest
@Test
public void readTest()
{
- for ( OffHeapMemoryBuffer<Object> buffer : MemoryManager.getBuffers() )
+ for ( OffHeapMemoryBuffer<Object> buffer : ((MemoryManagerServiceImpl<Object>)MemoryManager.getMemoryManager()).getBuffers() )
{
for ( Pointer<Object> ptr : ((OffHeapMemoryBufferImpl<Object>) buffer).getPointers() )
{