You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directmemory.apache.org by bp...@apache.org on 2012/03/01 12:41:36 UTC

svn commit: r1295522 [1/2] - in /incubator/directmemory/trunk: directmemory-cache/src/main/java/org/apache/directmemory/cache/ directmemory-cache/src/main/java/org/apache/directmemory/memory/ directmemory-cache/src/main/java/org/apache/directmemory/mem...

Author: bperroud
Date: Thu Mar  1 11:41:34 2012
New Revision: 1295522

URL: http://svn.apache.org/viewvc?rev=1295522&view=rev
Log:
DIRECTMEMORY-40, DIRECTMEMORY-60 : separate responsabilities more clearly, set allocation with merging pointers as the default one, add a SLAB's style allocator (fixed buffer size), mark OffHeapMemoryBuffer as deprecated

Added:
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/AbstractByteBufferAllocator.java
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/ByteBufferAllocator.java
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/DirectByteBufferUtils.java
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeByteBufferAllocatorImpl.java
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/MergingByteBufferAllocatorImpl.java
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/SlabByteBufferAllocatorImpl.java
    incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/allocator/
    incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/allocator/FixedSizeByteBufferAllocatorImplTest.java
    incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/allocator/MergingByteBufferAllocatorImplTest.java
    incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/allocator/SlabByteBufferAllocatorImplTest.java
Removed:
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMergingMemoryBufferImpl.java
    incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplWithMerginOHMBAndAllocationPolicyTest.java
    incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplWithMerginOHMBTest.java
    incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/OffHeapMergingMemoryBufferTest.java
Modified:
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceWithAllocationPolicyImpl.java
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBuffer.java
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBufferImpl.java
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/RoundRobinAllocationPolicy.java
    incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/cache/CacheServiceImplTest.java
    incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/AbstractOffHeapMemoryBufferTest.java
    incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent2Test.java
    incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent3Test.java
    incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplTest.java
    incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerTest.java
    incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/RoundRobinAllocationPolicyTest.java
    incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Starter.java
    incubator/directmemory/trunk/integrations/ehcache/src/main/java/org/apache/directmemory/ehcache/DirectMemoryCache.java
    incubator/directmemory/trunk/integrations/ehcache/src/main/java/org/apache/directmemory/ehcache/DirectMemoryStore.java
    incubator/directmemory/trunk/integrations/ehcache/src/test/java/org/apache/directmemory/ehcache/EHCacheTest.java
    incubator/directmemory/trunk/itests/osgi/src/test/java/org/apache/directmemory/tests/osgi/cache/CacheServiceExportingActivator.java

Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java Thu Mar  1 11:41:34 2012
@@ -295,7 +295,17 @@ public class CacheServiceImpl<K, V>
         logger.info( format( "off-heap - buffer: \t%1d", mem.getBufferNumber() ) );
         logger.info( format( "off-heap - allocated: \t%1s", Ram.inMb( mem.capacity() ) ) );
         logger.info( format( "off-heap - used:      \t%1s", Ram.inMb( mem.used() ) ) );
-        logger.info( format( "heap 	- max: \t%1s", Ram.inMb( Runtime.getRuntime().maxMemory() ) ) );
+        logger.info( format( "heap  - max: \t%1s", Ram.inMb( Runtime.getRuntime().maxMemory() ) ) );
+        logger.info( format( "heap     - allocated: \t%1s", Ram.inMb( Runtime.getRuntime().totalMemory() ) ) );
+        logger.info( format( "heap     - free : \t%1s", Ram.inMb( Runtime.getRuntime().freeMemory() ) ) );
+        logger.info( "************************************************" );
+    }
+
+    public void dump( MemoryManagerService<V> mms )
+    {
+        logger.info( format( "off-heap - allocated: \t%1s", Ram.inMb( mms.capacity() ) ) );
+        logger.info( format( "off-heap - used:      \t%1s", Ram.inMb( mms.used() ) ) );
+        logger.info( format( "heap  - max: \t%1s", Ram.inMb( Runtime.getRuntime().maxMemory() ) ) );
         logger.info( format( "heap     - allocated: \t%1s", Ram.inMb( Runtime.getRuntime().totalMemory() ) ) );
         logger.info( format( "heap     - free : \t%1s", Ram.inMb( Runtime.getRuntime().freeMemory() ) ) );
         logger.info( "************************************************" );
@@ -311,10 +321,7 @@ public class CacheServiceImpl<K, V>
 
         logger.info( "*** DirectMemory statistics ********************" );
 
-        for ( OffHeapMemoryBuffer<V> mem : memoryManager.getBuffers() )
-        {
-            dump( mem );
-        }
+        dump( memoryManager );
     }
 
     @Override

Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java Thu Mar  1 11:41:34 2012
@@ -21,6 +21,8 @@ package org.apache.directmemory.memory;
 
 import java.util.List;
 
+import org.apache.directmemory.memory.allocator.ByteBufferAllocator;
+
 /**
  * Interface describing the buffer allocation policy.
  * The implementations will be initialized by setting the list of buffers {@link #init(List)},
@@ -30,7 +32,7 @@ import java.util.List;
  * @author bperroud
  *
  */
-public interface AllocationPolicy<T>
+public interface AllocationPolicy
 {
 
     /**
@@ -38,16 +40,16 @@ public interface AllocationPolicy<T>
      *
      * @param buffers
      */
-    void init( List<OffHeapMemoryBuffer<T>> buffers );
+    void init( List<ByteBufferAllocator> allocators );
 
     /**
-     * Returns the active buffer in which to allocate.
+     * Returns the {@link ByteBufferAllocator} to use to allocate.
      *
-     * @param previouslyAllocatedBuffer : the previously allocated buffer, or null if it's the first allocation
+     * @param previousAllocator : the previously used {@link ByteBufferAllocator}, or null if it's the first allocation
      * @param allocationNumber : the number of time the allocation has already failed.
-     * @return the buffer to allocate, or null if allocation has failed.
+     * @return the {@link ByteBufferAllocator} to use, or null if allocation has failed.
      */
-    OffHeapMemoryBuffer<T> getActiveBuffer( OffHeapMemoryBuffer<T> previouslyAllocatedBuffer, int allocationNumber );
+    ByteBufferAllocator getActiveAllocator( ByteBufferAllocator previousAllocator, int allocationNumber );
 
     /**
      * Reset internal state

Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java Thu Mar  1 11:41:34 2012
@@ -22,8 +22,6 @@ package org.apache.directmemory.memory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-
 public class MemoryManager
 {
     private static Logger logger = LoggerFactory.getLogger( MemoryManager.class );
@@ -85,17 +83,6 @@ public class MemoryManager
         memoryManager.collectLFU();
     }
 
-    public static List<OffHeapMemoryBuffer<Object>> getBuffers()
-    {
-        return memoryManager.getBuffers();
-    }
-
-
-    public static OffHeapMemoryBuffer<Object> getActiveBuffer()
-    {
-        return memoryManager.getActiveBuffer();
-    }
-
     public static MemoryManagerService<Object> getMemoryManager()
     {
         return memoryManager;

Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java Thu Mar  1 11:41:34 2012
@@ -19,7 +19,6 @@ package org.apache.directmemory.memory;
  * under the License.
  */
 
-import java.util.List;
 
 public interface MemoryManagerService<V>
 {
@@ -79,14 +78,12 @@ public interface MemoryManagerService<V>
 
     long capacity();
 
+    long used();
+    
     long collectExpired();
 
     void collectLFU();
 
-    List<OffHeapMemoryBuffer<V>> getBuffers();
-
-    OffHeapMemoryBuffer<V> getActiveBuffer();
-
     <T extends V> Pointer<V> allocate( Class<T> type, int size, long expiresIn, long expires );
-
+    
 }

Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java Thu Mar  1 11:41:34 2012
@@ -19,144 +19,352 @@ package org.apache.directmemory.memory;
  * under the License.
  */
 
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.limit;
+import static com.google.common.collect.Ordering.from;
 import static java.lang.String.format;
 
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.directmemory.measures.Ram;
+import org.apache.directmemory.memory.allocator.ByteBufferAllocator;
+import org.apache.directmemory.memory.allocator.MergingByteBufferAllocatorImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Predicate;
+
 public class MemoryManagerServiceImpl<V>
     implements MemoryManagerService<V>
 {
 
+    protected static final long NEVER_EXPIRES = 0L;
+    
     protected static Logger logger = LoggerFactory.getLogger( MemoryManager.class );
 
-    protected List<OffHeapMemoryBuffer<V>> buffers = new ArrayList<OffHeapMemoryBuffer<V>>();
+    private List<ByteBufferAllocator> allocators;
 
-    protected int activeBufferIndex = 0;
+    private final Set<Pointer<V>> pointers = Collections.newSetFromMap( new ConcurrentHashMap<Pointer<V>, Boolean>() );
+    
+    protected int activeAllocatorIndex = 0;
+
+    private final boolean returnNullWhenFull;
+    
+    protected final AtomicLong used = new AtomicLong( 0L );
 
     public MemoryManagerServiceImpl()
     {
+        this( true );
+    }
+    
+    public MemoryManagerServiceImpl( final boolean returnNullWhenFull )
+    {
+        this.returnNullWhenFull = returnNullWhenFull;
     }
 
+    @Override
     public void init( int numberOfBuffers, int size )
     {
-        buffers = new ArrayList<OffHeapMemoryBuffer<V>>( numberOfBuffers );
 
+        allocators = new ArrayList<ByteBufferAllocator>( numberOfBuffers );
+        
         for ( int i = 0; i < numberOfBuffers; i++ )
         {
-            final OffHeapMemoryBuffer<V> offHeapMemoryBuffer = instanciateOffHeapMemoryBuffer( size, i );
-            buffers.add( offHeapMemoryBuffer );
+            final ByteBufferAllocator allocator = instanciateByteBufferAllocator( i, size );
+            allocators.add( allocator );
         }
 
         logger.info( format( "MemoryManager initialized - %d buffers, %s each", numberOfBuffers, Ram.inMb( size ) ) );
     }
 
-    protected OffHeapMemoryBuffer<V> instanciateOffHeapMemoryBuffer( int size, int bufferNumber )
+    
+    protected ByteBufferAllocator instanciateByteBufferAllocator( final int allocatorNumber, final int size )
     {
-        return OffHeapMemoryBufferImpl.createNew( size, bufferNumber );
+        final MergingByteBufferAllocatorImpl allocator = new MergingByteBufferAllocatorImpl( allocatorNumber, size );
+        
+        // Hack to ensure the pointers are always splitted as it was the case before.
+        allocator.setMinSizeThreshold( 0.0 );
+        allocator.setSizeRatioThreshold( 1.0 );
+        
+        return allocator;
     }
 
-    public OffHeapMemoryBuffer<V> getActiveBuffer()
+    protected ByteBufferAllocator getAllocator( int allocatorIndex )
     {
-        return buffers.get( activeBufferIndex );
+        return allocators.get( allocatorIndex );
     }
 
+    @Override
     public Pointer<V> store( byte[] payload, int expiresIn )
     {
-        Pointer<V> p = getActiveBuffer().store( payload, expiresIn );
-        if ( p == null )
+        
+        int allocatorIndex = activeAllocatorIndex;
+        
+        ByteBuffer buffer = getAllocator( allocatorIndex ).allocate( payload.length );
+        
+        if (buffer == null && allocators.size() > 1)
         {
-            nextBuffer();
-            p = getActiveBuffer().store( payload, expiresIn );
+            allocatorIndex = nextAllocator();
+            buffer = getAllocator( allocatorIndex ).allocate( payload.length );
         }
+        
+        if (buffer == null)
+        {
+            if (returnsNullWhenFull())
+            {
+                return null;
+            }
+            else
+            {
+                throw new BufferOverflowException();
+            }
+        }
+        
+        buffer.rewind();
+        buffer.put( payload );
+        
+        Pointer<V> p = instanciatePointer( buffer, allocatorIndex, expiresIn, NEVER_EXPIRES );
+        
+        used.addAndGet( payload.length );
+        
         return p;
     }
 
+    @Override
     public Pointer<V> store( byte[] payload )
     {
         return store( payload, 0 );
     }
 
+    @Override
     public Pointer<V> update( Pointer<V> pointer, byte[] payload )
     {
-        return buffers.get( pointer.getBufferNumber() ).update( pointer, payload );
+        free( pointer );
+        return store( payload );
     }
 
-    public byte[] retrieve( Pointer<V> pointer )
+    @Override
+    public byte[] retrieve( final Pointer<V> pointer )
     {
-        return buffers.get( pointer.getBufferNumber() ).retrieve( pointer );
+        // check if pointer has not been freed before
+        if (!pointers.contains( pointer ))
+        {
+            return null;
+        }
+        
+        pointer.hit();
+        
+        final ByteBuffer buf = pointer.getDirectBuffer().asReadOnlyBuffer();
+        buf.rewind();
+
+        final byte[] swp = new byte[buf.limit()];
+        buf.get( swp );
+        return swp;
     }
 
-    public void free( Pointer<V> pointer )
+    @Override
+    public void free( final Pointer<V> pointer )
     {
-        buffers.get( pointer.getBufferNumber() ).free( pointer );
+        if ( !pointers.remove( pointer ) )
+        {
+            // pointers has been already freed.
+            //throw new IllegalArgumentException( "This pointer " + pointer + " has already been freed" );
+            return;
+        }
+        
+        getAllocator( pointer.getBufferNumber() ).free( pointer.getDirectBuffer() );
+        
+        used.addAndGet( - pointer.getCapacity() );
+        
+        pointer.setFree( true );
     }
 
+    @Override
     public void clear()
     {
-        for ( OffHeapMemoryBuffer<V> buffer : buffers )
+        pointers.clear();
+        for (ByteBufferAllocator allocator : allocators)
         {
-            buffer.clear();
+            allocator.clear();
         }
-        activeBufferIndex = 0;
     }
 
+    @Override
     public long capacity()
     {
         long totalCapacity = 0;
-        for ( OffHeapMemoryBuffer<V> buffer : buffers )
+        for (ByteBufferAllocator allocator : allocators)
         {
-            totalCapacity += buffer.capacity();
+            totalCapacity += allocator.getCapacity();
         }
         return totalCapacity;
     }
+    
+    @Override
+    public long used()
+    {
+        return used.get();
+    }
 
-    public long collectExpired()
+    private final Predicate<Pointer<V>> relative = new Predicate<Pointer<V>>()
     {
-        long disposed = 0;
-        for ( OffHeapMemoryBuffer<V> buffer : buffers )
+
+        @Override
+        public boolean apply( Pointer<V> input )
         {
-            disposed += buffer.collectExpired();
+            return !input.isFree() && !input.isExpired();
         }
-        return disposed;
-    }
 
-    public void collectLFU()
+    };
+
+    private final Predicate<Pointer<V>> absolute = new Predicate<Pointer<V>>()
     {
-        for ( OffHeapMemoryBuffer<V> buf : buffers )
+
+        @Override
+        public boolean apply( Pointer<V> input )
         {
-            buf.collectLFU( -1 );
+            return !input.isFree() && !input.isExpired();
         }
-    }
 
-    public List<OffHeapMemoryBuffer<V>> getBuffers()
+    };
+    
+    @Override
+    public long collectExpired()
     {
-        return buffers;
+        int limit = 50;
+        return free( limit( filter( pointers, relative ), limit ) )
+                        + free( limit( filter( pointers, absolute ), limit ) );
+        
     }
 
-    public void setBuffers( List<OffHeapMemoryBuffer<V>> buffers )
+    @Override
+    public void collectLFU()
     {
-        this.buffers = buffers;
+
+        int limit = pointers.size() / 10;
+        
+        Iterable<Pointer<V>> result = from( new Comparator<Pointer<V>>()
+        {
+
+            public int compare( Pointer<V> o1, Pointer<V> o2 )
+            {
+                float f1 = o1.getFrequency();
+                float f2 = o2.getFrequency();
+
+                return Float.compare( f1, f2 );
+            }
+
+        } ).sortedCopy( limit( filter( pointers, new Predicate<Pointer<V>>()
+        {
+
+            @Override
+            public boolean apply( Pointer<V> input )
+            {
+                return !input.isFree();
+            }
+
+        } ), limit ) );
+
+        free( result );
+        
     }
 
-    public <T extends V> Pointer<V> allocate( Class<T> type, int size, long expiresIn, long expires )
+    protected long free( Iterable<Pointer<V>> pointers )
     {
-        Pointer<V> p = getActiveBuffer().allocate( type, size, expiresIn, expires );
-        if ( p == null )
+        long howMuch = 0;
+        for ( Pointer<V> expired : pointers )
         {
-            nextBuffer();
-            p = getActiveBuffer().allocate( type, size, expiresIn, expires );
+            howMuch += expired.getCapacity();
+            free( expired );
         }
-        return p;
+        return howMuch;
+    }
+    
+    
+    protected List<ByteBufferAllocator> getAllocators()
+    {
+        return allocators;
+    }
+    
+    @Deprecated
+    public List<OffHeapMemoryBuffer<V>> getBuffers()
+    {
+        return Collections.<OffHeapMemoryBuffer<V>>emptyList();
     }
 
-    protected void nextBuffer()
+    @Deprecated
+    @Override
+    public <T extends V> Pointer<V> allocate( final Class<T> type, final int size, final long expiresIn, final long expires )
+    {
+        
+        int allocatorIndex = activeAllocatorIndex;
+        
+        ByteBuffer buffer = getAllocator( allocatorIndex ).allocate( size );
+        
+        if (buffer == null && allocators.size() > 1)
+        {
+            allocatorIndex = nextAllocator();
+            buffer = getAllocator( allocatorIndex ).allocate( size );
+        }
+        
+        if (buffer == null)
+        {
+            if (returnsNullWhenFull())
+            {
+                return null;
+            }
+            else
+            {
+                throw new BufferOverflowException();
+            }
+        }
+        
+        Pointer<V> pointer = instanciatePointer( buffer, allocatorIndex, expiresIn, expires );
+        
+        if (pointer != null)
+        {
+            pointer.setClazz( type );
+        }
+        
+        used.addAndGet( size );
+        
+        return pointer;
+        
+    }
+    
+    protected Pointer<V> instanciatePointer( final ByteBuffer buffer, final int allocatorIndex, final long expiresIn, final long expires )
+    {
+        
+        Pointer<V> p = new PointerImpl<V>();
+        
+        p.setDirectBuffer( buffer );
+        p.setExpiration( expires, expiresIn );
+        p.setBufferNumber( allocatorIndex );
+        p.setFree( false );
+        p.createdNow();
+        
+        pointers.add( p );
+        
+        return p;
+    }
+    
+    protected int nextAllocator()
+    {
+        activeAllocatorIndex = ( activeAllocatorIndex + 1 ) % allocators.size();
+        return activeAllocatorIndex;
+    }
+    
+    protected boolean returnsNullWhenFull()
     {
-        activeBufferIndex = ( activeBufferIndex + 1 ) % buffers.size();
+        return returnNullWhenFull;
     }
 
 }

Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceWithAllocationPolicyImpl.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceWithAllocationPolicyImpl.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceWithAllocationPolicyImpl.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceWithAllocationPolicyImpl.java Thu Mar  1 11:41:34 2012
@@ -1,6 +1,9 @@
 package org.apache.directmemory.memory;
 
 import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+
+import org.apache.directmemory.memory.allocator.ByteBufferAllocator;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -29,53 +32,51 @@ import java.nio.BufferOverflowException;
 public class MemoryManagerServiceWithAllocationPolicyImpl<V>
     extends MemoryManagerServiceImpl<V>
 {
-
-    protected boolean returnNullWhenFull = true;
     
-    protected AllocationPolicy<V> allocationPolicy;
+    protected AllocationPolicy allocationPolicy;
 
     public MemoryManagerServiceWithAllocationPolicyImpl()
     {
         super();
     }
     
-    public MemoryManagerServiceWithAllocationPolicyImpl( final AllocationPolicy<V> allocationPolicy, final boolean returnNullWhenFull )
+    public MemoryManagerServiceWithAllocationPolicyImpl( final AllocationPolicy allocationPolicy, final boolean returnNullWhenFull )
     {
-        this();
+        super( returnNullWhenFull );
         this.setAllocationPolicy( allocationPolicy );
-        this.returnNullWhenFull = returnNullWhenFull;
     }
     
     @Override
     public void init( int numberOfBuffers, int size )
     {
         super.init( numberOfBuffers, size );
-        allocationPolicy.init( getBuffers() );
+        allocationPolicy.init( getAllocators() );
     }
 
-    public void setAllocationPolicy( final AllocationPolicy<V> allocationPolicy )
+    public void setAllocationPolicy( final AllocationPolicy allocationPolicy )
     {
         this.allocationPolicy = allocationPolicy;
     }
 
-    @Override
-    public OffHeapMemoryBuffer<V> getActiveBuffer()
+    
+    protected ByteBufferAllocator getAllocator()
     {
-        return allocationPolicy.getActiveBuffer( null, 0 );
+        return allocationPolicy.getActiveAllocator( null, 0 );
     }
 
     @Override
     public Pointer<V> store( byte[] payload, int expiresIn )
     {
         Pointer<V> p = null;
-        OffHeapMemoryBuffer<V> buffer = null;
-        int allocationNumber = 1;
+        ByteBufferAllocator allocator = null;
+        int allocationNumber = 0;
         do
         {
-            buffer = allocationPolicy.getActiveBuffer( buffer, allocationNumber );
-            if ( buffer == null )
+            allocationNumber++;
+            allocator = allocationPolicy.getActiveAllocator( allocator, allocationNumber );
+            if ( allocator == null )
             {
-                if (returnNullWhenFull)
+                if (returnsNullWhenFull())
                 {
                     return null;
                 }
@@ -84,8 +85,20 @@ public class MemoryManagerServiceWithAll
                     throw new BufferOverflowException();
                 }
             }
-            p = buffer.store( payload, expiresIn );
-            allocationNumber++;
+            final ByteBuffer buffer = allocator.allocate( payload.length );
+            
+            if ( buffer == null )
+            {
+                continue;
+            }
+            
+            p = instanciatePointer( buffer, allocator.getNumber(), expiresIn, NEVER_EXPIRES );
+
+            buffer.rewind();
+            buffer.put( payload );
+            
+            used.addAndGet( payload.length );
+            
         }
         while ( p == null );
         return p;
@@ -99,17 +112,18 @@ public class MemoryManagerServiceWithAll
     }
 
     @Override
-    public <T extends V> Pointer<V> allocate( Class<T> type, int size, long expiresIn, long expires )
+    public <T extends V> Pointer<V> allocate( final Class<T> type, final int size, final long expiresIn, final long expires )
     {
         Pointer<V> p = null;
-        OffHeapMemoryBuffer<V> buffer = null;
-        int allocationNumber = 1;
+        ByteBufferAllocator allocator = null;
+        int allocationNumber = 0;
         do
         {
-            buffer = allocationPolicy.getActiveBuffer( buffer, allocationNumber );
-            if ( buffer == null )
+            allocationNumber++;
+            allocator = allocationPolicy.getActiveAllocator( allocator, allocationNumber );
+            if ( allocator == null )
             {
-                if (returnNullWhenFull)
+                if (returnsNullWhenFull())
                 {
                     return null;
                 }
@@ -118,10 +132,22 @@ public class MemoryManagerServiceWithAll
                     throw new BufferOverflowException();
                 }
             }
-            p = buffer.allocate( type, size, expiresIn, expires );
-            allocationNumber++;
+            
+            final ByteBuffer buffer = allocator.allocate( size );
+            
+            if ( buffer == null )
+            {
+                continue;
+            }
+            
+            p = instanciatePointer( buffer, allocator.getNumber(), expiresIn, NEVER_EXPIRES );
+            
+            used.addAndGet( size );
         }
         while ( p == null );
+        
+        p.setClazz( type );
+        
         return p;
     }
 

Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBuffer.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBuffer.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBuffer.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBuffer.java Thu Mar  1 11:41:34 2012
@@ -21,6 +21,7 @@ package org.apache.directmemory.memory;
 
 import java.util.Date;
 
+@Deprecated
 public interface OffHeapMemoryBuffer<T>
 {
 

Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBufferImpl.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBufferImpl.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBufferImpl.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBufferImpl.java Thu Mar  1 11:41:34 2012
@@ -33,6 +33,7 @@ import org.apache.directmemory.measures.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@Deprecated
 public class OffHeapMemoryBufferImpl<T>
     extends AbstractOffHeapMemoryBuffer<T>
 {

Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java Thu Mar  1 11:41:34 2012
@@ -74,7 +74,7 @@ public class PointerImpl<T>
     @Override
     public int getCapacity()
     {
-        return end - start + 1;
+        return directBuffer == null ? end - start + 1 : directBuffer.limit();
     }
 
     @Override

Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/RoundRobinAllocationPolicy.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/RoundRobinAllocationPolicy.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/RoundRobinAllocationPolicy.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/RoundRobinAllocationPolicy.java Thu Mar  1 11:41:34 2012
@@ -22,6 +22,8 @@ package org.apache.directmemory.memory;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.directmemory.memory.allocator.ByteBufferAllocator;
+
 /**
  * Round Robin allocation policy. An internal counter is incremented (modulo the size of the buffer), so each calls to
  * {@link #getActiveBuffer(OffHeapMemoryBuffer, int)} will increment the counter and return the buffer at the index of
@@ -29,15 +31,15 @@ import java.util.concurrent.atomic.Atomi
  *
  * @author bperroud
  */
-public class RoundRobinAllocationPolicy<T>
-    implements AllocationPolicy<T>
+public class RoundRobinAllocationPolicy
+    implements AllocationPolicy
 {
 
     // Increment the counter and get the value. Need to start at -1 to have 0'index at first call.
     private static final int BUFFERS_INDEX_INITIAL_VALUE = -1;
 
     // All the buffers to allocate
-    private List<OffHeapMemoryBuffer<T>> buffers;
+    private List<ByteBufferAllocator> allocators;
 
     // Cyclic counter
     private AtomicInteger buffersIndexCounter = new AtomicInteger( BUFFERS_INDEX_INITIAL_VALUE );
@@ -48,19 +50,19 @@ public class RoundRobinAllocationPolicy<
     // Current max number of allocations
     private int maxAllocations = DEFAULT_MAX_ALLOCATIONS;
 
-    public void setMaxAllocations( int maxAllocations )
+    public void setMaxAllocations( final int maxAllocations )
     {
         this.maxAllocations = maxAllocations;
     }
 
     @Override
-    public void init( List<OffHeapMemoryBuffer<T>> buffers )
+    public void init( final List<ByteBufferAllocator> allocators )
     {
-        this.buffers = buffers;
+        this.allocators = allocators;
     }
 
     @Override
-    public OffHeapMemoryBuffer<T> getActiveBuffer( OffHeapMemoryBuffer<T> previouslyAllocatedBuffer, int allocationNumber )
+    public ByteBufferAllocator getActiveAllocator( final ByteBufferAllocator previousAllocator, final int allocationNumber )
     {
         // If current allocation is more than the limit, return a null buffer.
         if ( allocationNumber > maxAllocations )
@@ -71,9 +73,9 @@ public class RoundRobinAllocationPolicy<
         // Thread safely increment and get the next buffer's index
         int i = incrementAndGetBufferIndex();
 
-        final OffHeapMemoryBuffer<T> buffer = buffers.get( i );
+        final ByteBufferAllocator allocator = allocators.get( i );
 
-        return buffer;
+        return allocator;
     }
 
     @Override
@@ -95,7 +97,7 @@ public class RoundRobinAllocationPolicy<
         do
         {
             int currentIndex = buffersIndexCounter.get();
-            newIndex = ( currentIndex + 1 ) % buffers.size();
+            newIndex = ( currentIndex + 1 ) % allocators.size();
             updateOk = buffersIndexCounter.compareAndSet( currentIndex, newIndex );
         }
         while ( !updateOk );

Added: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/AbstractByteBufferAllocator.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/AbstractByteBufferAllocator.java?rev=1295522&view=auto
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/AbstractByteBufferAllocator.java (added)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/AbstractByteBufferAllocator.java Thu Mar  1 11:41:34 2012
@@ -0,0 +1,39 @@
+package org.apache.directmemory.memory.allocator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public abstract class AbstractByteBufferAllocator
+    implements ByteBufferAllocator
+{
+
+    private final int number;
+    
+    AbstractByteBufferAllocator( final int number )
+    {
+        this.number = number;
+    }
+
+    @Override
+    public int getNumber()
+    {
+        return number;
+    }
+
+}

Added: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/ByteBufferAllocator.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/ByteBufferAllocator.java?rev=1295522&view=auto
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/ByteBufferAllocator.java (added)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/ByteBufferAllocator.java Thu Mar  1 11:41:34 2012
@@ -0,0 +1,53 @@
+package org.apache.directmemory.memory.allocator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+
+/**
+ * 
+ * 
+ *
+ */
+public interface ByteBufferAllocator
+    extends Closeable
+{
+    
+    /**
+     * Return the given ByteBuffer making it available for a future usage 
+     * @param buffer
+     */
+    void free( final ByteBuffer buffer );
+    
+    /**
+     * Allocate the given size off heap, or return null if the allocation failed.
+     * @param size
+     * @return
+     */
+    ByteBuffer allocate( final int size );
+    
+    void clear();
+    
+    int getCapacity();
+    
+    int getNumber();
+    
+}

Added: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/DirectByteBufferUtils.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/DirectByteBufferUtils.java?rev=1295522&view=auto
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/DirectByteBufferUtils.java (added)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/DirectByteBufferUtils.java Thu Mar  1 11:41:34 2012
@@ -0,0 +1,28 @@
+package org.apache.directmemory.memory.allocator;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class DirectByteBufferUtils
+{
+
+    public static void destroyDirectByteBuffer( final ByteBuffer buffer )
+        throws IllegalArgumentException, IllegalAccessException, InvocationTargetException, SecurityException,
+        NoSuchMethodException
+    {
+
+        checkArgument( buffer.isDirect(), "toBeDestroyed isn't direct!" );
+
+        Method cleanerMethod = buffer.getClass().getMethod( "cleaner" );
+        cleanerMethod.setAccessible( true );
+        Object cleaner = cleanerMethod.invoke( buffer );
+        Method cleanMethod = cleaner.getClass().getMethod( "clean" );
+        cleanMethod.setAccessible( true );
+        cleanMethod.invoke( cleaner );
+
+    }
+    
+}

Added: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeByteBufferAllocatorImpl.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeByteBufferAllocatorImpl.java?rev=1295522&view=auto
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeByteBufferAllocatorImpl.java (added)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeByteBufferAllocatorImpl.java Thu Mar  1 11:41:34 2012
@@ -0,0 +1,215 @@
+package org.apache.directmemory.memory.allocator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * {@link ByteBufferAllocator} implementation that instantiate {@link ByteBuffer}s of fixed size, called slices.
+ * 
+ * @author bperroud
+ * 
+ */
+public class FixedSizeByteBufferAllocatorImpl
+    extends AbstractByteBufferAllocator
+{
+
+    protected static Logger logger = LoggerFactory.getLogger( FixedSizeByteBufferAllocatorImpl.class );
+
+    // Collection that keeps track of the parent buffers (segments) where slices are allocated
+    private final Set<ByteBuffer> segmentsBuffers = new HashSet<ByteBuffer>();
+
+    // Collection that owns all slice that can be used.
+    private final Queue<ByteBuffer> freeBuffers = new ConcurrentLinkedQueue<ByteBuffer>();
+
+    // Size of each slices dividing each segments of the slab
+    private final int sliceSize;
+
+    // Total size of the current slab
+    private int totalSize;
+
+    // Tells if one need to keep track of borrowed buffers
+    private boolean keepTrackOfUsedSliceBuffers = false;
+    
+    // Tells if it returns null or throw an BufferOverflowExcpetion with the requested size is bigger than the size of the slices
+    private boolean returnNullWhenOversizingSliceSize = true;
+    
+    // Tells if it returns null when no buffers are available
+    private boolean returnNullWhenNoBufferAvailable = true;
+
+    // Collection that keeps track of borrowed buffers
+    private final Set<ByteBuffer> usedSliceBuffers = Collections
+        .newSetFromMap( new ConcurrentHashMap<ByteBuffer, Boolean>() );
+
+    protected Logger getLogger()
+    {
+        return logger;
+    }
+
+    /**
+     * Constructor.
+     * @param totalSize : the internal buffer
+     * @param sliceSize : arbitrary number of the buffer.
+     * @param numberOfSegments : 
+     */
+    FixedSizeByteBufferAllocatorImpl( final int number, final int totalSize, final int sliceSize, final int numberOfSegments )
+    {
+        super( number );
+        
+        this.totalSize = totalSize;
+        this.sliceSize = sliceSize;
+
+        init( numberOfSegments );
+
+    }
+
+    protected void init( final int numberOfSegments )
+    {
+        // Compute the size of each segments
+        int segmentSize = totalSize / numberOfSegments;
+        // size is rounded down to a multiple of the slice size
+        segmentSize -= segmentSize % sliceSize;
+
+        for ( int i = 0; i < numberOfSegments; i++ )
+        {
+            final ByteBuffer segment = ByteBuffer.allocateDirect( segmentSize );
+            segmentsBuffers.add( segment );
+
+            for ( int j = 0; j < segment.capacity(); j += sliceSize )
+            {
+                segment.clear();
+                segment.position( j );
+                segment.limit( j + sliceSize );
+                final ByteBuffer slice = segment.slice();
+                freeBuffers.add( slice );
+            }
+        }
+    }
+    
+
+    protected ByteBuffer findFreeBuffer( int capacity )
+    {
+        if ( capacity > sliceSize )
+        {
+            if (returnNullWhenOversizingSliceSize)
+            {
+                return null;
+            }
+            else
+            {
+                throw new BufferOverflowException();
+            }
+        }
+        // TODO : Add capacity to wait till a given timeout for a freed buffer
+        return freeBuffers.poll();
+    }
+
+    @Override
+    public void free( final ByteBuffer byteBuffer )
+    {
+
+        if ( keepTrackOfUsedSliceBuffers && !usedSliceBuffers.remove( byteBuffer ) )
+        {
+            return;
+        }
+
+        Preconditions.checkArgument( byteBuffer.capacity() == sliceSize );
+
+        freeBuffers.offer( byteBuffer );
+
+    }
+
+    @Override
+    public ByteBuffer allocate( int size )
+    {
+
+        ByteBuffer allocatedByteBuffer = findFreeBuffer( size );
+
+        if ( allocatedByteBuffer == null )
+        {
+            if (returnNullWhenNoBufferAvailable)
+            {
+                return null;
+            }
+            else
+            {
+                throw new BufferOverflowException();
+            }
+        }
+
+        allocatedByteBuffer.clear();
+        allocatedByteBuffer.limit( size );
+
+        if ( keepTrackOfUsedSliceBuffers )
+        {
+            usedSliceBuffers.add( allocatedByteBuffer );
+        }
+
+        return allocatedByteBuffer;
+
+    }
+
+    public int getSliceSize()
+    {
+        return sliceSize;
+    }
+
+    @Override
+    public void clear()
+    {
+        // Nothing to do.
+    }
+
+    @Override
+    public int getCapacity()
+    {
+        return totalSize;
+    }
+    
+    @Override
+    public void close()
+    {
+        clear();
+        
+        for ( final ByteBuffer buffer : segmentsBuffers )
+        {
+            try 
+            {
+                DirectByteBufferUtils.destroyDirectByteBuffer( buffer );
+            }
+            catch (Exception e)
+            {
+                
+            }
+        }
+    }
+}

Added: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/MergingByteBufferAllocatorImpl.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/MergingByteBufferAllocatorImpl.java?rev=1295522&view=auto
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/MergingByteBufferAllocatorImpl.java (added)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/MergingByteBufferAllocatorImpl.java Thu Mar  1 11:41:34 2012
@@ -0,0 +1,421 @@
+package org.apache.directmemory.memory.allocator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * {@link ByteBufferAllocator} implementation with {@link ByteBuffer} merging capabilities
+ *
+ * {@link TreeMap} can be safely used because synchronization is achieved through a {@link Lock}
+ *
+ * @author bperroud
+ *
+ */
+public class MergingByteBufferAllocatorImpl
+    extends AbstractByteBufferAllocator
+{
+
+    private static final double DEFAULT_SIZE_RATIO_THRESHOLD = 0.9;
+
+    private static final int DEFAULT_MIN_SIZE_THRESHOLD = 128;
+    
+    protected static Logger logger = LoggerFactory.getLogger( MergingByteBufferAllocatorImpl.class );
+
+    // List of free pointers, with several list of different size
+    private final NavigableMap<Integer, Collection<LinkedByteBuffer>> freePointers = new ConcurrentSkipListMap<Integer, Collection<LinkedByteBuffer>>();
+
+    // Set of used pointers. The key is a hash of ByteBuffer.
+    private final Map<Integer, LinkedByteBuffer> usedPointers = new ConcurrentHashMap<Integer, LinkedByteBuffer>();
+
+    // Lock used instead of synchronized block to guarantee consistency when manipulating list of pointers.
+    private final Lock pointerManipulationLock = new ReentrantLock();
+
+    private final ByteBuffer parentBuffer;
+
+    // Allowed size ratio of the returned pointer before splitting the pointer
+    private double sizeRatioThreshold = DEFAULT_SIZE_RATIO_THRESHOLD;
+    
+    //
+    private double minSizeThreshold = DEFAULT_MIN_SIZE_THRESHOLD;
+
+    private boolean returnNullWhenBufferIsFull = true;
+    
+    protected Logger getLogger()
+    {
+        return logger;
+    }
+
+    /**
+     * Constructor.
+     * @param buffer : the internal buffer
+     * @param bufferNumber : arbitrary number of the buffer.
+     */
+    public MergingByteBufferAllocatorImpl( final int number, final int totalSize )
+    {
+        super( number );
+        
+        parentBuffer = ByteBuffer.allocateDirect( totalSize );
+        init();
+    }
+
+    /**
+     * Initialization function. Create an initial free {@link Pointer} mapping the whole buffer.
+     */
+    protected void init()
+    {
+        Integer totalSize = Integer.valueOf( parentBuffer.capacity() );
+
+        for ( Integer i : generateQueueSizes( totalSize ) )
+        {
+            freePointers.put( Integer.valueOf( i ), new LinkedHashSet<LinkedByteBuffer>() );
+        }
+
+        initFirstBuffer();
+
+    }
+
+    private void initFirstBuffer()
+    {
+        parentBuffer.clear();
+        final ByteBuffer initialBuffer = parentBuffer.slice();
+        final LinkedByteBuffer initialLinkedBuffer = new LinkedByteBuffer( 0, initialBuffer, null, null );
+
+        insertLinkedBuffer( initialLinkedBuffer );
+    }
+    
+    
+    protected List<Integer> generateQueueSizes( final Integer totalSize )
+    {
+        List<Integer> sizes = new ArrayList<Integer>();
+
+        for ( int i = 128; i <= totalSize; i *= 8 )
+        {
+            sizes.add( Integer.valueOf( i ) );
+        }
+
+        // If totalSize < 128 or totalSize is not a multiple of 128 
+        // we force adding an element to the map
+        if ( sizes.isEmpty() || !sizes.contains( totalSize ) )
+        {
+            sizes.add( totalSize );
+        }
+
+        return sizes;
+    }
+
+    @Override
+    public void free( final ByteBuffer buffer )
+    {
+        
+        LinkedByteBuffer returningLinkedBuffer = usedPointers.remove( getHash( buffer ) );
+        
+        if ( returningLinkedBuffer == null )
+        {
+            // Hu ? returned twice ? Not returned at the right place ?
+            throw new IllegalArgumentException( "The buffer " + buffer + " seems not to belong to this allocator" );
+        }
+
+        try
+        {
+            pointerManipulationLock.lock();
+
+            if ( returningLinkedBuffer.getBefore() != null )
+            {
+                // if returningLinkedBuffer.getBefore is in the free list, it is free, then it's free and can be merged
+                if (getFreeLinkedByteBufferCollection( returningLinkedBuffer.getBefore() ).contains( returningLinkedBuffer.getBefore() ) )
+                {
+                    returningLinkedBuffer = mergePointer( returningLinkedBuffer.getBefore(), returningLinkedBuffer );
+                }
+            }
+
+            if ( returningLinkedBuffer.getAfter() != null )
+            {
+                // if returningLinkedBuffer.getAfter is in the free list, it is free, it is free, then it's free and can be merged 
+                if (getFreeLinkedByteBufferCollection( returningLinkedBuffer.getAfter() ).contains( returningLinkedBuffer.getAfter() ) )
+                {
+                    returningLinkedBuffer = mergePointer( returningLinkedBuffer, returningLinkedBuffer.getAfter() );
+                }
+            }
+
+            insertLinkedBuffer( returningLinkedBuffer );
+        }
+        finally
+        {
+            pointerManipulationLock.unlock();
+        }
+    }
+
+    @Override
+    public ByteBuffer allocate( final int size )
+    {
+
+        try
+        {
+            pointerManipulationLock.lock();
+
+            final SortedMap<Integer, Collection<LinkedByteBuffer>> freeMap = freePointers
+                .tailMap( size - 1 );
+            for ( final Map.Entry<Integer, Collection<LinkedByteBuffer>> bufferQueueEntry : freeMap.entrySet() )
+            {
+
+                Iterator<LinkedByteBuffer> linkedByteBufferIterator = bufferQueueEntry.getValue().iterator();
+
+                while ( linkedByteBufferIterator.hasNext() )
+                {
+                    LinkedByteBuffer linkedBuffer = linkedByteBufferIterator.next();
+
+                    if ( linkedBuffer.getBuffer().capacity() >= size )
+                    {
+                        // Remove this element from the collection
+                        linkedByteBufferIterator.remove();
+
+                        LinkedByteBuffer returnedLinkedBuffer = linkedBuffer;
+
+                        // Check if splitting need to be performed
+                        if ( linkedBuffer.getBuffer().capacity() > minSizeThreshold
+                            && ( 1.0 * size / linkedBuffer.getBuffer().capacity() ) < sizeRatioThreshold )
+                        {
+                            // Split the buffer in a buffer that will be returned and another buffer reinserted in the corresponding queue.
+                            parentBuffer.clear();
+                            parentBuffer.position( linkedBuffer.getOffset() );
+                            parentBuffer.limit( linkedBuffer.getOffset() + size );
+                            final ByteBuffer newBuffer = parentBuffer.slice();
+
+                            returnedLinkedBuffer = new LinkedByteBuffer( linkedBuffer.getOffset(), newBuffer,
+                                                                         linkedBuffer.getBefore(), null );
+                            
+                            if (linkedBuffer.getBefore() != null)
+                            {
+                                linkedBuffer.getBefore().setAfter( returnedLinkedBuffer );
+                            }
+                            
+                            parentBuffer.clear();
+                            parentBuffer.position( linkedBuffer.getOffset() + size );
+                            parentBuffer.limit( linkedBuffer.getOffset() + linkedBuffer.getBuffer().capacity() );
+                            final ByteBuffer remainingBuffer = parentBuffer.slice();
+                            final LinkedByteBuffer remainingLinkedBuffer = new LinkedByteBuffer(
+                                         linkedBuffer.getOffset() + size, remainingBuffer,
+                                         returnedLinkedBuffer, linkedBuffer.getAfter() );
+                            
+                            if (linkedBuffer.getAfter() != null)
+                            {
+                                linkedBuffer.getAfter().setBefore( remainingLinkedBuffer );
+                            }
+                            
+                            returnedLinkedBuffer.setAfter( remainingLinkedBuffer );
+                            
+                            insertLinkedBuffer( remainingLinkedBuffer );
+                            
+                        }
+                        else
+                        {
+                            // If the buffer is not split, set the limit accordingly
+                            returnedLinkedBuffer.getBuffer().clear();
+                            returnedLinkedBuffer.getBuffer().limit( size );
+                        }
+
+                        usedPointers.put( getHash( returnedLinkedBuffer.getBuffer() ), returnedLinkedBuffer );
+
+                        return returnedLinkedBuffer.getBuffer();
+                    }
+
+                }
+            }
+
+            if (returnNullWhenBufferIsFull)
+            {
+                return null;
+            }
+            else
+            {
+                throw new BufferOverflowException();
+            }
+            
+        }
+        finally
+        {
+            pointerManipulationLock.unlock();
+        }
+    }
+
+    @Override
+    public void clear()
+    {
+        usedPointers.clear();
+
+        for ( final Map.Entry<Integer, Collection<LinkedByteBuffer>> bufferQueueEntry : freePointers.entrySet() )
+        {
+            bufferQueueEntry.getValue().clear();
+        }
+        
+        initFirstBuffer();
+    }
+
+    private void insertLinkedBuffer( final LinkedByteBuffer linkedBuffer )
+    {
+        getFreeLinkedByteBufferCollection( linkedBuffer ).add( linkedBuffer );
+    }
+
+    private Collection<LinkedByteBuffer> getFreeLinkedByteBufferCollection( final LinkedByteBuffer linkedBuffer )
+    {
+        final Integer size = Integer.valueOf( linkedBuffer.getBuffer().capacity() - 1 );
+        final Map.Entry<Integer, Collection<LinkedByteBuffer>> bufferCollectionEntry = freePointers.ceilingEntry( size );
+        return bufferCollectionEntry.getValue();
+    }
+
+    private LinkedByteBuffer mergePointer( final LinkedByteBuffer first, final LinkedByteBuffer next )
+    {
+        parentBuffer.clear();
+        parentBuffer.position( first.getOffset() );
+        parentBuffer.limit( first.getOffset() + first.getBuffer().capacity() + next.getBuffer().capacity() );
+        final ByteBuffer newByteBuffer = parentBuffer.slice();
+
+        final LinkedByteBuffer newLinkedByteBuffer = new LinkedByteBuffer( first.getOffset(), newByteBuffer,
+                                                                           first.getBefore(), next.getAfter() );
+
+        if ( first.getBefore() != null )
+        {
+            first.getBefore().setAfter( newLinkedByteBuffer );
+        }
+
+        if ( next.getAfter() != null )
+        {
+            next.getAfter().setBefore( newLinkedByteBuffer );
+        }
+
+        // Remove the two pointers from their corresponding free lists.
+        getFreeLinkedByteBufferCollection( first ).remove( first );
+        getFreeLinkedByteBufferCollection( next ).remove( next );
+        
+        return newLinkedByteBuffer;
+    }
+
+    private static Integer getHash( final ByteBuffer buffer )
+    {
+        return Integer.valueOf( System.identityHashCode( buffer ) );
+    }
+    
+    public void setSizeRatioThreshold( final double sizeRatioThreshold )
+    {
+        this.sizeRatioThreshold = sizeRatioThreshold;
+    }
+    
+    public void setMinSizeThreshold( final double minSizeThreshold )
+    {
+        this.minSizeThreshold = minSizeThreshold;
+    }
+
+    public void setReturnNullWhenBufferIsFull( boolean returnNullWhenBufferIsFull )
+    {
+        this.returnNullWhenBufferIsFull = returnNullWhenBufferIsFull;
+    }
+
+    @Override
+    public int getCapacity()
+    {
+        return parentBuffer.capacity();
+    }
+    
+    private static class LinkedByteBuffer
+    {
+        private final int offset;
+
+        private final ByteBuffer buffer;
+
+        private volatile LinkedByteBuffer before;
+
+        private volatile LinkedByteBuffer after;
+
+        public LinkedByteBuffer( final int offset, final ByteBuffer buffer, final LinkedByteBuffer before,
+                                 final LinkedByteBuffer after )
+        {
+            this.offset = offset;
+            this.buffer = buffer;
+            setBefore( before );
+            setAfter( after );
+        }
+
+        public ByteBuffer getBuffer()
+        {
+            return buffer;
+        }
+
+        public int getOffset()
+        {
+            return offset;
+        }
+
+        public LinkedByteBuffer getBefore()
+        {
+            return before;
+        }
+
+        public void setBefore( final LinkedByteBuffer before )
+        {
+            this.before = before;
+        }
+
+        public LinkedByteBuffer getAfter()
+        {
+            return after;
+        }
+
+        public void setAfter( final LinkedByteBuffer after )
+        {
+            this.after = after;
+        }
+    }
+
+    @Override
+    public void close()
+        throws IOException
+    {
+        clear();
+        
+        try {
+            DirectByteBufferUtils.destroyDirectByteBuffer( parentBuffer );
+        }
+        catch (Exception e)
+        {
+            
+        }
+    }
+
+    
+}

Added: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/SlabByteBufferAllocatorImpl.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/SlabByteBufferAllocatorImpl.java?rev=1295522&view=auto
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/SlabByteBufferAllocatorImpl.java (added)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/SlabByteBufferAllocatorImpl.java Thu Mar  1 11:41:34 2012
@@ -0,0 +1,215 @@
+package org.apache.directmemory.memory.allocator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * {@link ByteBufferAllocator} implementation that instantiate uses {@link FixedSizeByteBufferAllocatorImpl} of different size to allocate best matching's size {@link ByteBuffer}
+ * 
+ * @author bperroud
+ * 
+ */
+public class SlabByteBufferAllocatorImpl
+    extends AbstractByteBufferAllocator
+{
+
+    protected static Logger logger = LoggerFactory.getLogger( SlabByteBufferAllocatorImpl.class );
+
+    // Tells if it returns null when no buffers are available
+    private boolean returnNullWhenNoBufferAvailable = true;
+    
+    // Internal slabs sorted by sliceSize
+    private final NavigableMap<Integer, FixedSizeByteBufferAllocatorImpl> slabs = new ConcurrentSkipListMap<Integer, FixedSizeByteBufferAllocatorImpl>();
+    
+    private final boolean allowAllocationToBiggerSlab;
+
+    protected Logger getLogger()
+    {
+        return logger;
+    }
+
+    /**
+     * Constructor.
+     * @param totalSize : the internal buffer
+     * @param sliceSize : arbitrary number of the buffer.
+     * @param numberOfSegments : 
+     */
+    SlabByteBufferAllocatorImpl( final int number, final Collection<FixedSizeByteBufferAllocatorImpl> slabs, final boolean allowAllocationToBiggerSlab )
+    {
+        super( number );
+        
+        this.allowAllocationToBiggerSlab = allowAllocationToBiggerSlab;
+
+        for (FixedSizeByteBufferAllocatorImpl slab : slabs)
+        {
+            this.slabs.put( slab.getSliceSize(), slab );
+        }
+
+    }
+    
+
+    private FixedSizeByteBufferAllocatorImpl getSlabThatMatchTheSize( final int size )
+    {
+        // Find the slab that can carry the wanted size. -1 is used because higherEntry returns a strictly higher entry.
+        final Map.Entry<Integer, FixedSizeByteBufferAllocatorImpl> entry = slabs.higherEntry( size - 1 );
+
+        if ( entry != null )
+        {
+            return entry.getValue();
+        }
+
+        // If an entry has not been found, this means that no slabs has bigger enough slices to allocate the given size
+        return null;
+    }
+
+    @Override
+    public void free( final ByteBuffer byteBuffer )
+    {
+
+        final FixedSizeByteBufferAllocatorImpl slab = getSlabThatMatchTheSize( byteBuffer.capacity() );
+
+        if (slab == null)
+        {
+            // Hu ? where this bytebuffer come from ??
+            return;
+        }
+        
+        slab.free( byteBuffer );
+
+    }
+
+    @Override
+    public ByteBuffer allocate( final int size )
+    {
+
+        
+        final FixedSizeByteBufferAllocatorImpl slab = getSlabThatMatchTheSize( size );
+
+        if ( slab == null )
+        {
+            // unable to store such big objects
+            if (returnNullWhenNoBufferAvailable)
+            {
+                return null;
+            }
+            else
+            {
+                throw new BufferOverflowException();
+            }
+        }
+
+        // Try to allocate the given size
+        final ByteBuffer byteBuffer = slab.allocate( size );
+        
+        // If allocation succeed, return the buffer
+        if (byteBuffer != null)
+        {
+            return byteBuffer;
+        }
+        
+        // Otherwise we have the option to allow in a bigger slab.
+        if (!allowAllocationToBiggerSlab)
+        {
+            if (returnNullWhenNoBufferAvailable)
+            {
+                return null;
+            }
+            else
+            {
+                throw new BufferOverflowException();
+            }
+        }
+        else
+        {
+            // We can try to allocate to a bigger slab.
+            // size + 1 here because getSlabThatMatchTheSize do a size -1 and thus will return the same slab
+            final int biggerSize = slab.getSliceSize() + 1;
+            final FixedSizeByteBufferAllocatorImpl biggerSlab = getSlabThatMatchTheSize( biggerSize );
+            if (biggerSlab == null)
+            {
+                // We were already trying to allocate in the biggest slab
+                if (returnNullWhenNoBufferAvailable)
+                {
+                    return null;
+                }
+                else
+                {
+                    throw new BufferOverflowException();
+                }
+            }
+            
+            final ByteBuffer secondByteBuffer = biggerSlab.allocate( size );
+            
+            if (secondByteBuffer == null)
+            {
+                if (returnNullWhenNoBufferAvailable)
+                {
+                    return null;
+                }
+                else
+                {
+                    throw new BufferOverflowException();
+                }
+            }
+            else
+            {
+                return secondByteBuffer;
+            }
+        }
+    }
+
+    @Override
+    public void clear()
+    {
+        // Nothing to do.
+    }
+
+    @Override
+    public int getCapacity()
+    {
+        int totalSize = 0;
+        for (final Map.Entry<Integer, FixedSizeByteBufferAllocatorImpl> entry : slabs.entrySet())
+        {
+            totalSize += entry.getValue().getCapacity();
+        }
+        return totalSize;
+    }
+
+    @Override
+    public void close()
+        throws IOException
+    {
+        for (final Map.Entry<Integer, FixedSizeByteBufferAllocatorImpl> entry : slabs.entrySet())
+        {
+            entry.getValue().close();
+        }
+    }
+}

Modified: incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/cache/CacheServiceImplTest.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/cache/CacheServiceImplTest.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/cache/CacheServiceImplTest.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/cache/CacheServiceImplTest.java Thu Mar  1 11:41:34 2012
@@ -36,7 +36,7 @@ public class CacheServiceImplTest
     @Test
     public void testOffHeapExceedMemoryReturnNullWhenTrue()
     {
-        AllocationPolicy<byte[]> allocationPolicy = new RoundRobinAllocationPolicy<byte[]>();
+        AllocationPolicy allocationPolicy = new RoundRobinAllocationPolicy();
         MemoryManagerService<byte[]> memoryManager =
             new MemoryManagerServiceWithAllocationPolicyImpl<byte[]>( allocationPolicy, true );
         CacheService<Integer, byte[]> cache = new DirectMemory<Integer, byte[]>()

Modified: incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/AbstractOffHeapMemoryBufferTest.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/AbstractOffHeapMemoryBufferTest.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/AbstractOffHeapMemoryBufferTest.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/AbstractOffHeapMemoryBufferTest.java Thu Mar  1 11:41:34 2012
@@ -27,7 +27,6 @@ import java.util.Random;
 import junit.framework.Assert;
 
 import org.apache.directmemory.memory.MemoryManagerService;
-import org.apache.directmemory.memory.OffHeapMergingMemoryBufferImpl;
 import org.apache.directmemory.memory.OffHeapMemoryBuffer;
 import org.apache.directmemory.memory.Pointer;
 import org.junit.Test;
@@ -314,43 +313,6 @@ public abstract class AbstractOffHeapMem
     }
 
     @Test
-    public void testRandomPayload2()
-    {
-
-        final int NUMBER_OF_OBJECTS = 10;
-        final int BUFFER_SIZE = NUMBER_OF_OBJECTS * SMALL_PAYLOAD_LENGTH;
-
-        final OffHeapMergingMemoryBufferImpl<Object> offHeapLinkedMemoryBuffer = OffHeapMergingMemoryBufferImpl
-            .createNew( BUFFER_SIZE );
-
-        byte[] payload1 = MemoryTestUtils.generateRandomPayload( 2 * SMALL_PAYLOAD_LENGTH );
-        Pointer<Object> pointer1 = offHeapLinkedMemoryBuffer.store( payload1 );
-        Assert.assertNotNull( pointer1 );
-
-        byte[] fetchedPayload1 = offHeapLinkedMemoryBuffer.retrieve( pointer1 );
-        Assert.assertEquals( new String( payload1 ), new String( fetchedPayload1 ) );
-
-        byte[] payload2 = MemoryTestUtils.generateRandomPayload( SMALL_PAYLOAD_LENGTH );
-        Pointer<Object> pointer2 = offHeapLinkedMemoryBuffer.store( payload2 );
-        Assert.assertNotNull( pointer2 );
-
-        byte[] fetchedPayload2 = offHeapLinkedMemoryBuffer.retrieve( pointer2 );
-        Assert.assertEquals( new String( payload2 ), new String( fetchedPayload2 ) );
-
-        offHeapLinkedMemoryBuffer.free( pointer1 );
-
-        offHeapLinkedMemoryBuffer.free( pointer1 );
-
-        byte[] payload3 = MemoryTestUtils.generateRandomPayload( SMALL_PAYLOAD_LENGTH );
-        Pointer<Object> pointer3 = offHeapLinkedMemoryBuffer.store( payload3 );
-        Assert.assertNotNull( pointer3 );
-
-        byte[] fetchedPayload3 = offHeapLinkedMemoryBuffer.retrieve( pointer3 );
-        Assert.assertEquals( new String( payload3 ), new String( fetchedPayload3 ) );
-
-    }
-
-    @Test
     public void testUpdate()
     {
 

Modified: incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent2Test.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent2Test.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent2Test.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent2Test.java Thu Mar  1 11:41:34 2012
@@ -28,7 +28,6 @@ import com.carrotsearch.junitbenchmarks.
 import com.google.common.collect.MapMaker;
 import org.apache.directmemory.measures.Ram;
 import org.apache.directmemory.memory.MemoryManager;
-import org.apache.directmemory.memory.OffHeapMemoryBuffer;
 import org.apache.directmemory.memory.Pointer;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -204,11 +203,10 @@ public class Concurrent2Test
 
     private static Logger logger = LoggerFactory.getLogger( Concurrent2Test.class );
 
-    private static void dump( OffHeapMemoryBuffer<Object> mem )
+    private static void dump( MemoryManagerService<Object> mms )
     {
-        logger.info( "off-heap - buffer: " + mem.getBufferNumber() );
-        logger.info( "off-heap - allocated: " + Ram.inMb( mem.capacity() ) );
-        logger.info( "off-heap - used:      " + Ram.inMb( mem.used() ) );
+        logger.info( "off-heap - allocated: " + Ram.inMb( mms.capacity() ) );
+        logger.info( "off-heap - used:      " + Ram.inMb( mms.used() ) );
         logger.info( "heap 	  - max: " + Ram.inMb( Runtime.getRuntime().maxMemory() ) );
         logger.info( "heap     - allocated: " + Ram.inMb( Runtime.getRuntime().totalMemory() ) );
         logger.info( "heap     - free : " + Ram.inMb( Runtime.getRuntime().freeMemory() ) );
@@ -225,10 +223,7 @@ public class Concurrent2Test
     public static void dump()
     {
 
-        for ( OffHeapMemoryBuffer<Object> mem : MemoryManager.getBuffers() )
-        {
-            dump( mem );
-        }
+        dump( MemoryManager.getMemoryManager() );
 
         logger.info( "************************************************" );
         logger.info( "entries: " + entries );

Modified: incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent3Test.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent3Test.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent3Test.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent3Test.java Thu Mar  1 11:41:34 2012
@@ -27,7 +27,6 @@ import com.carrotsearch.junitbenchmarks.
 import com.google.common.collect.MapMaker;
 import org.apache.directmemory.measures.Ram;
 import org.apache.directmemory.memory.MemoryManager;
-import org.apache.directmemory.memory.OffHeapMemoryBuffer;
 import org.apache.directmemory.memory.Pointer;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -226,12 +225,11 @@ public class Concurrent3Test
 
     private static Logger logger = LoggerFactory.getLogger( Concurrent3Test.class );
 
-    private static void dump( OffHeapMemoryBuffer<Object> mem )
+    private static void dump( MemoryManagerService<Object> mms )
     {
-        logger.info( "off-heap - buffer: " + mem.getBufferNumber() );
-        logger.info( "off-heap - allocated: " + Ram.inMb( mem.capacity() ) );
-        logger.info( "off-heap - used:      " + Ram.inMb( mem.used() ) );
-        logger.info( "heap 	  - max: " + Ram.inMb( Runtime.getRuntime().maxMemory() ) );
+        logger.info( "off-heap - allocated: " + Ram.inMb( mms.capacity() ) );
+        logger.info( "off-heap - used:      " + Ram.inMb( mms.used() ) );
+        logger.info( "heap    - max: " + Ram.inMb( Runtime.getRuntime().maxMemory() ) );
         logger.info( "heap     - allocated: " + Ram.inMb( Runtime.getRuntime().totalMemory() ) );
         logger.info( "heap     - free : " + Ram.inMb( Runtime.getRuntime().freeMemory() ) );
         logger.info( "************************************************" );
@@ -247,10 +245,7 @@ public class Concurrent3Test
     public static void dump()
     {
 
-        for ( OffHeapMemoryBuffer<Object> mem : MemoryManager.getBuffers() )
-        {
-            dump( mem );
-        }
+        dump( MemoryManager.getMemoryManager() );
 
         logger.info( "************************************************" );
         logger.info( "entries: " + entries );

Modified: incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplTest.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplTest.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplTest.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplTest.java Thu Mar  1 11:41:34 2012
@@ -130,7 +130,7 @@ public class MemoryManagerServiceImplTes
         }
 
         // Buffer is fully used.
-        Assert.assertEquals( BUFFER_SIZE, memoryManagerService.getBuffers().get( 0 ).used() );
+        Assert.assertEquals( BUFFER_SIZE, memoryManagerService.used() );
 
         Assert.assertNotNull( lastPointer );
         memoryManagerService.free( lastPointer );
@@ -139,7 +139,7 @@ public class MemoryManagerServiceImplTes
         Assert.assertNotNull( pointerNotNull );
 
         // Buffer again fully used.
-        Assert.assertEquals( BUFFER_SIZE, memoryManagerService.getBuffers().get( 0 ).used() );
+        Assert.assertEquals( BUFFER_SIZE, memoryManagerService.used() );
 
     }
 

Modified: incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerTest.java?rev=1295522&r1=1295521&r2=1295522&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerTest.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerTest.java Thu Mar  1 11:41:34 2012
@@ -60,9 +60,9 @@ public class MemoryManagerTest
         assertNotNull( p );
         //assertEquals(size,p.end);
         assertEquals( size, p.getCapacity() );
-        assertEquals( size, MemoryManager.getActiveBuffer().used() );
+        assertEquals( size, MemoryManager.getMemoryManager().used() );
         MemoryManager.free( p );
-        assertEquals( 0, MemoryManager.getActiveBuffer().used() );
+        assertEquals( 0, MemoryManager.getMemoryManager().used() );
         logger.info( "end" );
     }
 
@@ -89,7 +89,7 @@ public class MemoryManagerTest
     @Test
     public void readTest()
     {
-        for ( OffHeapMemoryBuffer<Object> buffer : MemoryManager.getBuffers() )
+        for ( OffHeapMemoryBuffer<Object> buffer : ((MemoryManagerServiceImpl<Object>)MemoryManager.getMemoryManager()).getBuffers() )
         {
             for ( Pointer<Object> ptr : ((OffHeapMemoryBufferImpl<Object>) buffer).getPointers() )
             {