You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@directmemory.apache.org by Benoit Perroud <be...@noisette.ch> on 2012/03/01 12:54:00 UTC
Re: svn commit: r1295522 [1/2] - in /incubator/directmemory/trunk:
directmemory-cache/src/main/java/org/apache/directmemory/cache/
directmemory-cache/src/main/java/org/apache/directmemory/memory/ directmemory-cache/src/main/java/org/apache/directmemo
Hi All,
I was able to release the new ByteBufferAllocator interface
and two of it's implementation : merging and slab's style allocation.
All feedbacks welcome.
Some work remain on finding an easy way to switch from one
implementation to the other (using a builder ?) and regarding
documentation. I will actively act on the second one, and if someone
has an idea for the first one don't hesitate.
A small side note I will also put on the wiki : if you want to run DM
with big number, you need to update the -XX:MaxDirectMemorySize
parameter, for instance I'm running with -XX:MaxDirectMemorySize=2000m
Best,
Benoit.
2012/3/1 <bp...@apache.org>:
> Author: bperroud
> Date: Thu Mar 1 11:41:34 2012
> New Revision: 1295522
>
> URL: http://svn.apache.org/viewvc?rev=1295522&view=rev
> Log:
> DIRECTMEMORY-40, DIRECTMEMORY-60 : separate responsabilities more clearly, set allocation with merging pointers as the default one, add a SLAB's style allocator (fixed buffer size), mark OffHeapMemoryBuffer as deprecated
>
> Added:
> incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/
> incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/AbstractByteBufferAllocator.java
> incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/ByteBufferAllocator.java
> incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/DirectByteBufferUtils.java
> incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeByteBufferAllocatorImpl.java
> incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/MergingByteBufferAllocatorImpl.java
> incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/SlabByteBufferAllocatorImpl.java
> incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/allocator/
> incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/allocator/FixedSizeByteBufferAllocatorImplTest.java
> incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/allocator/MergingByteBufferAllocatorImplTest.java
> incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/allocator/SlabByteBufferAllocatorImplTest.java
> Removed:
> incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMergingMemoryBufferImpl.java
> incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplWithMerginOHMBAndAllocationPolicyTest.java
> incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplWithMerginOHMBTest.java
> incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/OffHeapMergingMemoryBufferTest.java
> Modified:
> incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java
> incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java
> incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java
> incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java
> incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
> incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceWithAllocationPolicyImpl.java
> incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBuffer.java
> incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBufferImpl.java
> incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java
> incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/RoundRobinAllocationPolicy.java
> incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/cache/CacheServiceImplTest.java
> incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/AbstractOffHeapMemoryBufferTest.java
> incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent2Test.java
> incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent3Test.java
> incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplTest.java
> incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerTest.java
> incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/RoundRobinAllocationPolicyTest.java
> incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Starter.java
> incubator/directmemory/trunk/integrations/ehcache/src/main/java/org/apache/directmemory/ehcache/DirectMemoryCache.java
> incubator/directmemory/trunk/integrations/ehcache/src/main/java/org/apache/directmemory/ehcache/DirectMemoryStore.java
> incubator/directmemory/trunk/integrations/ehcache/src/test/java/org/apache/directmemory/ehcache/EHCacheTest.java
> incubator/directmemory/trunk/itests/osgi/src/test/java/org/apache/directmemory/tests/osgi/cache/CacheServiceExportingActivator.java
>
> Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java
> URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java?rev=1295522&r1=1295521&r2=1295522&view=diff
> ==============================================================================
> --- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java (original)
> +++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java Thu Mar 1 11:41:34 2012
> @@ -295,7 +295,17 @@ public class CacheServiceImpl<K, V>
> logger.info( format( "off-heap - buffer: \t%1d", mem.getBufferNumber() ) );
> logger.info( format( "off-heap - allocated: \t%1s", Ram.inMb( mem.capacity() ) ) );
> logger.info( format( "off-heap - used: \t%1s", Ram.inMb( mem.used() ) ) );
> - logger.info( format( "heap - max: \t%1s", Ram.inMb( Runtime.getRuntime().maxMemory() ) ) );
> + logger.info( format( "heap - max: \t%1s", Ram.inMb( Runtime.getRuntime().maxMemory() ) ) );
> + logger.info( format( "heap - allocated: \t%1s", Ram.inMb( Runtime.getRuntime().totalMemory() ) ) );
> + logger.info( format( "heap - free : \t%1s", Ram.inMb( Runtime.getRuntime().freeMemory() ) ) );
> + logger.info( "************************************************" );
> + }
> +
> + public void dump( MemoryManagerService<V> mms )
> + {
> + logger.info( format( "off-heap - allocated: \t%1s", Ram.inMb( mms.capacity() ) ) );
> + logger.info( format( "off-heap - used: \t%1s", Ram.inMb( mms.used() ) ) );
> + logger.info( format( "heap - max: \t%1s", Ram.inMb( Runtime.getRuntime().maxMemory() ) ) );
> logger.info( format( "heap - allocated: \t%1s", Ram.inMb( Runtime.getRuntime().totalMemory() ) ) );
> logger.info( format( "heap - free : \t%1s", Ram.inMb( Runtime.getRuntime().freeMemory() ) ) );
> logger.info( "************************************************" );
> @@ -311,10 +321,7 @@ public class CacheServiceImpl<K, V>
>
> logger.info( "*** DirectMemory statistics ********************" );
>
> - for ( OffHeapMemoryBuffer<V> mem : memoryManager.getBuffers() )
> - {
> - dump( mem );
> - }
> + dump( memoryManager );
> }
>
> @Override
>
> Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java
> URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java?rev=1295522&r1=1295521&r2=1295522&view=diff
> ==============================================================================
> --- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java (original)
> +++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java Thu Mar 1 11:41:34 2012
> @@ -21,6 +21,8 @@ package org.apache.directmemory.memory;
>
> import java.util.List;
>
> +import org.apache.directmemory.memory.allocator.ByteBufferAllocator;
> +
> /**
> * Interface describing the buffer allocation policy.
> * The implementations will be initialized by setting the list of buffers {@link #init(List)},
> @@ -30,7 +32,7 @@ import java.util.List;
> * @author bperroud
> *
> */
> -public interface AllocationPolicy<T>
> +public interface AllocationPolicy
> {
>
> /**
> @@ -38,16 +40,16 @@ public interface AllocationPolicy<T>
> *
> * @param buffers
> */
> - void init( List<OffHeapMemoryBuffer<T>> buffers );
> + void init( List<ByteBufferAllocator> allocators );
>
> /**
> - * Returns the active buffer in which to allocate.
> + * Returns the {@link ByteBufferAllocator} to use to allocate.
> *
> - * @param previouslyAllocatedBuffer : the previously allocated buffer, or null if it's the first allocation
> + * @param previousAllocator : the previously used {@link ByteBufferAllocator}, or null if it's the first allocation
> * @param allocationNumber : the number of time the allocation has already failed.
> - * @return the buffer to allocate, or null if allocation has failed.
> + * @return the {@link ByteBufferAllocator} to use, or null if allocation has failed.
> */
> - OffHeapMemoryBuffer<T> getActiveBuffer( OffHeapMemoryBuffer<T> previouslyAllocatedBuffer, int allocationNumber );
> + ByteBufferAllocator getActiveAllocator( ByteBufferAllocator previousAllocator, int allocationNumber );
>
> /**
> * Reset internal state
>
> Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java
> URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java?rev=1295522&r1=1295521&r2=1295522&view=diff
> ==============================================================================
> --- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java (original)
> +++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java Thu Mar 1 11:41:34 2012
> @@ -22,8 +22,6 @@ package org.apache.directmemory.memory;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> -import java.util.List;
> -
> public class MemoryManager
> {
> private static Logger logger = LoggerFactory.getLogger( MemoryManager.class );
> @@ -85,17 +83,6 @@ public class MemoryManager
> memoryManager.collectLFU();
> }
>
> - public static List<OffHeapMemoryBuffer<Object>> getBuffers()
> - {
> - return memoryManager.getBuffers();
> - }
> -
> -
> - public static OffHeapMemoryBuffer<Object> getActiveBuffer()
> - {
> - return memoryManager.getActiveBuffer();
> - }
> -
> public static MemoryManagerService<Object> getMemoryManager()
> {
> return memoryManager;
>
> Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java
> URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java?rev=1295522&r1=1295521&r2=1295522&view=diff
> ==============================================================================
> --- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java (original)
> +++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java Thu Mar 1 11:41:34 2012
> @@ -19,7 +19,6 @@ package org.apache.directmemory.memory;
> * under the License.
> */
>
> -import java.util.List;
>
> public interface MemoryManagerService<V>
> {
> @@ -79,14 +78,12 @@ public interface MemoryManagerService<V>
>
> long capacity();
>
> + long used();
> +
> long collectExpired();
>
> void collectLFU();
>
> - List<OffHeapMemoryBuffer<V>> getBuffers();
> -
> - OffHeapMemoryBuffer<V> getActiveBuffer();
> -
> <T extends V> Pointer<V> allocate( Class<T> type, int size, long expiresIn, long expires );
> -
> +
> }
>
> Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
> URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java?rev=1295522&r1=1295521&r2=1295522&view=diff
> ==============================================================================
> --- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java (original)
> +++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java Thu Mar 1 11:41:34 2012
> @@ -19,144 +19,352 @@ package org.apache.directmemory.memory;
> * under the License.
> */
>
> +import static com.google.common.collect.Iterables.filter;
> +import static com.google.common.collect.Iterables.limit;
> +import static com.google.common.collect.Ordering.from;
> import static java.lang.String.format;
>
> +import java.nio.BufferOverflowException;
> +import java.nio.ByteBuffer;
> import java.util.ArrayList;
> +import java.util.Collections;
> +import java.util.Comparator;
> import java.util.List;
> +import java.util.Set;
> +import java.util.concurrent.ConcurrentHashMap;
> +import java.util.concurrent.atomic.AtomicLong;
>
> import org.apache.directmemory.measures.Ram;
> +import org.apache.directmemory.memory.allocator.ByteBufferAllocator;
> +import org.apache.directmemory.memory.allocator.MergingByteBufferAllocatorImpl;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> +import com.google.common.base.Predicate;
> +
> public class MemoryManagerServiceImpl<V>
> implements MemoryManagerService<V>
> {
>
> + protected static final long NEVER_EXPIRES = 0L;
> +
> protected static Logger logger = LoggerFactory.getLogger( MemoryManager.class );
>
> - protected List<OffHeapMemoryBuffer<V>> buffers = new ArrayList<OffHeapMemoryBuffer<V>>();
> + private List<ByteBufferAllocator> allocators;
>
> - protected int activeBufferIndex = 0;
> + private final Set<Pointer<V>> pointers = Collections.newSetFromMap( new ConcurrentHashMap<Pointer<V>, Boolean>() );
> +
> + protected int activeAllocatorIndex = 0;
> +
> + private final boolean returnNullWhenFull;
> +
> + protected final AtomicLong used = new AtomicLong( 0L );
>
> public MemoryManagerServiceImpl()
> {
> + this( true );
> + }
> +
> + public MemoryManagerServiceImpl( final boolean returnNullWhenFull )
> + {
> + this.returnNullWhenFull = returnNullWhenFull;
> }
>
> + @Override
> public void init( int numberOfBuffers, int size )
> {
> - buffers = new ArrayList<OffHeapMemoryBuffer<V>>( numberOfBuffers );
>
> + allocators = new ArrayList<ByteBufferAllocator>( numberOfBuffers );
> +
> for ( int i = 0; i < numberOfBuffers; i++ )
> {
> - final OffHeapMemoryBuffer<V> offHeapMemoryBuffer = instanciateOffHeapMemoryBuffer( size, i );
> - buffers.add( offHeapMemoryBuffer );
> + final ByteBufferAllocator allocator = instanciateByteBufferAllocator( i, size );
> + allocators.add( allocator );
> }
>
> logger.info( format( "MemoryManager initialized - %d buffers, %s each", numberOfBuffers, Ram.inMb( size ) ) );
> }
>
> - protected OffHeapMemoryBuffer<V> instanciateOffHeapMemoryBuffer( int size, int bufferNumber )
> +
> + protected ByteBufferAllocator instanciateByteBufferAllocator( final int allocatorNumber, final int size )
> {
> - return OffHeapMemoryBufferImpl.createNew( size, bufferNumber );
> + final MergingByteBufferAllocatorImpl allocator = new MergingByteBufferAllocatorImpl( allocatorNumber, size );
> +
> + // Hack to ensure the pointers are always splitted as it was the case before.
> + allocator.setMinSizeThreshold( 0.0 );
> + allocator.setSizeRatioThreshold( 1.0 );
> +
> + return allocator;
> }
>
> - public OffHeapMemoryBuffer<V> getActiveBuffer()
> + protected ByteBufferAllocator getAllocator( int allocatorIndex )
> {
> - return buffers.get( activeBufferIndex );
> + return allocators.get( allocatorIndex );
> }
>
> + @Override
> public Pointer<V> store( byte[] payload, int expiresIn )
> {
> - Pointer<V> p = getActiveBuffer().store( payload, expiresIn );
> - if ( p == null )
> +
> + int allocatorIndex = activeAllocatorIndex;
> +
> + ByteBuffer buffer = getAllocator( allocatorIndex ).allocate( payload.length );
> +
> + if (buffer == null && allocators.size() > 1)
> {
> - nextBuffer();
> - p = getActiveBuffer().store( payload, expiresIn );
> + allocatorIndex = nextAllocator();
> + buffer = getAllocator( allocatorIndex ).allocate( payload.length );
> }
> +
> + if (buffer == null)
> + {
> + if (returnsNullWhenFull())
> + {
> + return null;
> + }
> + else
> + {
> + throw new BufferOverflowException();
> + }
> + }
> +
> + buffer.rewind();
> + buffer.put( payload );
> +
> + Pointer<V> p = instanciatePointer( buffer, allocatorIndex, expiresIn, NEVER_EXPIRES );
> +
> + used.addAndGet( payload.length );
> +
> return p;
> }
>
> + @Override
> public Pointer<V> store( byte[] payload )
> {
> return store( payload, 0 );
> }
>
> + @Override
> public Pointer<V> update( Pointer<V> pointer, byte[] payload )
> {
> - return buffers.get( pointer.getBufferNumber() ).update( pointer, payload );
> + free( pointer );
> + return store( payload );
> }
>
> - public byte[] retrieve( Pointer<V> pointer )
> + @Override
> + public byte[] retrieve( final Pointer<V> pointer )
> {
> - return buffers.get( pointer.getBufferNumber() ).retrieve( pointer );
> + // check if pointer has not been freed before
> + if (!pointers.contains( pointer ))
> + {
> + return null;
> + }
> +
> + pointer.hit();
> +
> + final ByteBuffer buf = pointer.getDirectBuffer().asReadOnlyBuffer();
> + buf.rewind();
> +
> + final byte[] swp = new byte[buf.limit()];
> + buf.get( swp );
> + return swp;
> }
>
> - public void free( Pointer<V> pointer )
> + @Override
> + public void free( final Pointer<V> pointer )
> {
> - buffers.get( pointer.getBufferNumber() ).free( pointer );
> + if ( !pointers.remove( pointer ) )
> + {
> + // pointers has been already freed.
> + //throw new IllegalArgumentException( "This pointer " + pointer + " has already been freed" );
> + return;
> + }
> +
> + getAllocator( pointer.getBufferNumber() ).free( pointer.getDirectBuffer() );
> +
> + used.addAndGet( - pointer.getCapacity() );
> +
> + pointer.setFree( true );
> }
>
> + @Override
> public void clear()
> {
> - for ( OffHeapMemoryBuffer<V> buffer : buffers )
> + pointers.clear();
> + for (ByteBufferAllocator allocator : allocators)
> {
> - buffer.clear();
> + allocator.clear();
> }
> - activeBufferIndex = 0;
> }
>
> + @Override
> public long capacity()
> {
> long totalCapacity = 0;
> - for ( OffHeapMemoryBuffer<V> buffer : buffers )
> + for (ByteBufferAllocator allocator : allocators)
> {
> - totalCapacity += buffer.capacity();
> + totalCapacity += allocator.getCapacity();
> }
> return totalCapacity;
> }
> +
> + @Override
> + public long used()
> + {
> + return used.get();
> + }
>
> - public long collectExpired()
> + private final Predicate<Pointer<V>> relative = new Predicate<Pointer<V>>()
> {
> - long disposed = 0;
> - for ( OffHeapMemoryBuffer<V> buffer : buffers )
> +
> + @Override
> + public boolean apply( Pointer<V> input )
> {
> - disposed += buffer.collectExpired();
> + return !input.isFree() && !input.isExpired();
> }
> - return disposed;
> - }
>
> - public void collectLFU()
> + };
> +
> + private final Predicate<Pointer<V>> absolute = new Predicate<Pointer<V>>()
> {
> - for ( OffHeapMemoryBuffer<V> buf : buffers )
> +
> + @Override
> + public boolean apply( Pointer<V> input )
> {
> - buf.collectLFU( -1 );
> + return !input.isFree() && !input.isExpired();
> }
> - }
>
> - public List<OffHeapMemoryBuffer<V>> getBuffers()
> + };
> +
> + @Override
> + public long collectExpired()
> {
> - return buffers;
> + int limit = 50;
> + return free( limit( filter( pointers, relative ), limit ) )
> + + free( limit( filter( pointers, absolute ), limit ) );
> +
> }
>
> - public void setBuffers( List<OffHeapMemoryBuffer<V>> buffers )
> + @Override
> + public void collectLFU()
> {
> - this.buffers = buffers;
> +
> + int limit = pointers.size() / 10;
> +
> + Iterable<Pointer<V>> result = from( new Comparator<Pointer<V>>()
> + {
> +
> + public int compare( Pointer<V> o1, Pointer<V> o2 )
> + {
> + float f1 = o1.getFrequency();
> + float f2 = o2.getFrequency();
> +
> + return Float.compare( f1, f2 );
> + }