You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directmemory.apache.org by bp...@apache.org on 2012/01/06 14:33:16 UTC

svn commit: r1228170 - in /incubator/directmemory/trunk/directmemory-cache/src: main/java/org/apache/directmemory/memory/ test/java/org/apache/directmemory/memory/test/

Author: bperroud
Date: Fri Jan  6 13:33:16 2012
New Revision: 1228170

URL: http://svn.apache.org/viewvc?rev=1228170&view=rev
Log:
DIRECTMEMORY-9, DIRECTMEMORY-40 : OffHeapMemoryBuffer implementation that reduce fragmentation by merging adjacent pointers together when freeing

Added:
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMergingMemoryBufferImpl.java
    incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/AbstractOffHeapMemoryBufferTest.java
    incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/OffHeapMemoryBufferTest.java
    incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/OffHeapMergingMemoryBufferTest.java
Modified:
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AbstractOffHeapMemoryBuffer.java
    incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBufferImpl.java

Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AbstractOffHeapMemoryBuffer.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AbstractOffHeapMemoryBuffer.java?rev=1228170&r1=1228169&r2=1228170&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AbstractOffHeapMemoryBuffer.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AbstractOffHeapMemoryBuffer.java Fri Jan  6 13:33:16 2012
@@ -233,4 +233,20 @@ public abstract class AbstractOffHeapMem
         pointer.clazz = null;
         pointer.directBuffer = null;
     }
+    
+    protected void setExpiration( final Pointer pointer, long expiresIn, long expires )
+    {
+
+        if ( expiresIn > 0 )
+        {
+            pointer.expiresIn = expiresIn;
+            pointer.expires = 0;
+        }
+        else if ( expires > 0 )
+        {
+            pointer.expiresIn = 0;
+            pointer.expires = expires;
+        }
+    }
+    
 }

Modified: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBufferImpl.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBufferImpl.java?rev=1228170&r1=1228169&r2=1228170&view=diff
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBufferImpl.java (original)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBufferImpl.java Fri Jan  6 13:33:16 2012
@@ -184,16 +184,7 @@ public class OffHeapMemoryBufferImpl
         Pointer fresh = slice( goodOne, payload.length );
 
         fresh.created = System.currentTimeMillis();
-        if ( expiresIn > 0 )
-        {
-            fresh.expiresIn = expiresIn;
-            fresh.expires = 0;
-        }
-        else if ( expires > 0 )
-        {
-            fresh.expiresIn = 0;
-            fresh.expires = expires;
-        }
+        setExpiration( fresh, expiresIn, expires );
 
         fresh.free = false;
         used.addAndGet( payload.length );
@@ -294,16 +285,7 @@ public class OffHeapMemoryBufferImpl
         Pointer fresh = slice( goodOne, size );
 
         fresh.created = System.currentTimeMillis();
-        if ( expiresIn > 0 )
-        {
-            fresh.expiresIn = expiresIn;
-            fresh.expires = 0;
-        }
-        else if ( expires > 0 )
-        {
-            fresh.expiresIn = 0;
-            fresh.expires = expires;
-        }
+        setExpiration( fresh, expiresIn, expires );
 
         fresh.free = false;
         used.addAndGet( size );

Added: incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMergingMemoryBufferImpl.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMergingMemoryBufferImpl.java?rev=1228170&view=auto
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMergingMemoryBufferImpl.java (added)
+++ incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMergingMemoryBufferImpl.java Fri Jan  6 13:33:16 2012
@@ -0,0 +1,483 @@
+package org.apache.directmemory.memory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.directmemory.measures.Ram;
+import org.apache.directmemory.misc.Format;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link OffHeapMemoryBuffer} implementation that internally user 3 data structures to
+ * store the pointers :
+ * 
+ * - 1 sorted list backed by a ({@link TreeMap}) that store the free pointers sorted
+ * by size desc, used to allocate memory efficiently : if the first pointer has
+ * not enough capacity, then no other pointers will.
+ * 
+ * - 1 sorted list backed by a ({@link TreeMap}) that store the free pointers sorted
+ * by address offset, used to merge freed pointer efficiently : when freeing a
+ * pointer, direct lookup and navigation in this list will find adjacent
+ * pointers
+ * 
+ * - 1 set backed by ({@link ConcurrentHashMap}) of used pointers, to not loose a
+ * reference to a pointer, and to be able to the buffer and stay in a consistent
+ * state
+ * 
+ * {@link TreeMap} can be safely used because synchronization is achieved through a {@link Lock}
+ * 
+ * @author bperroud
+ * 
+ */
+public class OffHeapMergingMemoryBufferImpl
+    extends AbstractOffHeapMemoryBuffer
+{
+
+    protected static Logger logger = LoggerFactory.getLogger( OffHeapMemoryBufferImpl.class );
+
+    // Default value as most list and set are backed by maps.
+    private final static Boolean DEFAULT_VALUE = Boolean.TRUE;
+
+    // List of free pointers sorted by size
+    private final TreeMap<Pointer, Boolean> freePointersBySizeDesc = new TreeMap<Pointer, Boolean>(
+                                                                                                    new PointerBySizeDesc() );
+
+    // List of free pointers sorted by memory offset
+    private final TreeMap<Pointer, Boolean> freePointersByMemoryOffsetAsc = new TreeMap<Pointer, Boolean>(
+                                                                                                           new PointerByMemoryOffsetAsc() );
+
+    // Set of used pointers
+    private final Set<Pointer> usedPointers = Collections.newSetFromMap( new ConcurrentHashMap<Pointer, Boolean>() );
+
+    // Lock used instead of synchronized block to guarantee consistency when manipulating list of pointers.
+    private final Lock pointerManipulationLock = new ReentrantLock();
+
+    protected Logger getLogger()
+    {
+        return logger;
+    }
+
+    /**
+     * Static constructor.
+     * @param capacity : size in byte of the internal buffer
+     * @param bufferNumber : arbitrary number of the buffer.
+     * @return an OffHeapMemoryBuffer with internal buffer allocated.
+     */
+    public static OffHeapMergingMemoryBufferImpl createNew( int capacity, int bufferNumber )
+    {
+        logger.info( Format.it( "Creating OffHeapLinkedMemoryBuffer %d with a capacity of %s", bufferNumber,
+                                Ram.inMb( capacity ) ) );
+        return new OffHeapMergingMemoryBufferImpl( ByteBuffer.allocateDirect( capacity ), bufferNumber );
+    }
+
+    /**
+     * Static constructor. bufferNumber will be set to -1.
+     * @param capacity : size in byte of the internal buffer
+     * @return an OffHeapMemoryBuffer with internal buffer allocated.
+     */
+    public static OffHeapMergingMemoryBufferImpl createNew( int capacity )
+    {
+        return new OffHeapMergingMemoryBufferImpl( ByteBuffer.allocateDirect( capacity ), -1 );
+    }
+
+    /**
+     * Constructor.
+     * @param buffer : the internal buffer
+     * @param bufferNumber : arbitrary number of the buffer.
+     */
+    private OffHeapMergingMemoryBufferImpl( ByteBuffer buffer, int bufferNumber )
+    {
+        super( buffer, bufferNumber );
+        createAndAddFirstPointer();
+    }
+
+    /**
+     * Initialization function. Create an initial free {@link Pointer} mapping the whole buffer.
+     */
+    protected Pointer createAndAddFirstPointer()
+    {
+        Pointer first = new Pointer( 0, buffer.capacity() - 1 );
+        first.bufferNumber = bufferNumber;
+        first.free = true;
+        freePointersBySizeDesc.put( first, DEFAULT_VALUE );
+        freePointersByMemoryOffsetAsc.put( first, DEFAULT_VALUE );
+        return first;
+    }
+
+    protected Pointer firstMatch( int capacity )
+    {
+        // check for empty instead of throwing an exception.
+        if ( freePointersBySizeDesc.isEmpty() )
+        {
+            return null;
+        }
+        try
+        {
+            Pointer ptr = freePointersBySizeDesc.firstKey();
+
+            if ( ptr.getCapacity() >= capacity )
+            { // 0 indexed
+                return ptr;
+            }
+        }
+        catch ( NoSuchElementException e )
+        {
+            // noop, just return null at the end of the function.
+        }
+        return null;
+    }
+
+    @Override
+    public byte[] retrieve( Pointer pointer )
+    {
+        pointer.lastHit = System.currentTimeMillis();
+        pointer.hits++;
+
+        ByteBuffer buf = buffer.duplicate();
+        buf.position( pointer.start );
+
+        final byte[] swp = new byte[pointer.getCapacity()];
+        buf.get( swp );
+        return swp;
+    }
+
+    @Override
+    public int free( Pointer pointer2free )
+    {
+
+        // Avoid freeing twice the same pointer. Maybe atomic boolean is required here.
+        if ( !pointer2free.free )
+        {
+
+            try
+            {
+                pointerManipulationLock.lock();
+
+                if ( !usedPointers.remove( pointer2free ) )
+                {
+                    return 0;
+                }
+
+                Pointer lowerPointerToMerge = pointer2free;
+
+                // search for adjacent pointers lower than the current one 
+                for ( Pointer adjacentPointer : freePointersByMemoryOffsetAsc.headMap( pointer2free, false )
+                    .descendingKeySet() )
+                {
+
+                    if ( adjacentPointer.end + 1 != lowerPointerToMerge.start )
+                    {
+                        break;
+                    }
+
+                    lowerPointerToMerge = adjacentPointer;
+                }
+
+                Pointer higherPointerToMerge = pointer2free;
+
+                // search for adjacent pointers higher than the current one
+                for ( Pointer adjacentPointer : freePointersByMemoryOffsetAsc.tailMap( pointer2free, false )
+                    .navigableKeySet() )
+                {
+
+                    if ( adjacentPointer.start - 1 != higherPointerToMerge.end )
+                    {
+                        break;
+                    }
+
+                    higherPointerToMerge = adjacentPointer;
+                }
+
+                // if there is adjacent pointers
+                if ( lowerPointerToMerge != higherPointerToMerge )
+                {
+
+                    final Pointer mergedPointer = new Pointer( lowerPointerToMerge.start, higherPointerToMerge.end );
+                    mergedPointer.free = true;
+
+                    final Iterator<Pointer> adjacentPointersIterator = freePointersByMemoryOffsetAsc
+                        .subMap( lowerPointerToMerge, true, higherPointerToMerge, true ).navigableKeySet().iterator();
+                    while ( adjacentPointersIterator.hasNext() )
+                    {
+                        Pointer adjacentPointer = adjacentPointersIterator.next();
+                        adjacentPointer.free = true; // if a reference to the pointer is kept, we must not use it.
+                        freePointersBySizeDesc.remove( adjacentPointer );
+                        adjacentPointersIterator.remove();
+                    }
+
+                    freePointersByMemoryOffsetAsc.put( mergedPointer, DEFAULT_VALUE );
+                    freePointersBySizeDesc.put( mergedPointer, DEFAULT_VALUE );
+                }
+                else
+                {
+                    freePointersByMemoryOffsetAsc.put( pointer2free, DEFAULT_VALUE );
+                    freePointersBySizeDesc.put( pointer2free, DEFAULT_VALUE );
+                }
+
+            }
+            finally
+            {
+                pointerManipulationLock.unlock();
+            }
+
+            resetPointer( pointer2free );
+
+            final int pointerCapacity = pointer2free.getCapacity();
+            used.addAndGet( -pointerCapacity );
+
+            return pointerCapacity;
+        }
+        else
+        {
+            return 0;
+        }
+    }
+
+    /**
+     * Clear the buffer. Free all used pointers, clear all structures and create a new initial pointer.
+     */
+    @Override
+    public void clear()
+    {
+        allocationErrors = 0;
+
+        for ( Pointer pointer : usedPointers )
+        {
+            pointer.free = true;
+            //            free( pointer ); // too costly to merge every pointers while the will be cleared in a row
+        }
+        usedPointers.clear();
+        freePointersByMemoryOffsetAsc.clear();
+        freePointersBySizeDesc.clear();
+
+        createAndAddFirstPointer();
+        buffer.clear();
+        used.set( 0 );
+    }
+
+    @Override
+    protected Pointer store( byte[] payload, long expiresIn, long expires )
+    {
+        final int size = payload.length;
+
+        final Pointer allocatedPointer = allocatePointer( size );
+
+        if ( allocatedPointer != null )
+        {
+
+            setExpiration( allocatedPointer, expiresIn, expires );
+
+            allocatedPointer.free = false;
+
+            final ByteBuffer buf = buffer.duplicate();
+            buf.position( allocatedPointer.start );
+            buf.limit( allocatedPointer.start + size );
+
+            buf.put( payload );
+
+            used.addAndGet( size );
+
+        }
+
+        return allocatedPointer;
+    }
+
+    private Pointer allocatePointer( int size )
+    {
+
+        Pointer goodOne, fresh = null;
+
+        try
+        {
+
+            pointerManipulationLock.lock();
+
+            goodOne = firstMatch( size );
+
+            if ( goodOne == null )
+            {
+                allocationErrors++;
+                return null; // not enough space on this buffer. 
+            }
+
+            // Remove good pointer because it's size and offset will change.
+            freePointersByMemoryOffsetAsc.remove( goodOne );
+            freePointersBySizeDesc.remove( goodOne );
+
+            //fresh = slice(goodOne, size);
+            fresh = goodOne;
+
+            if ( goodOne.getCapacity() != size )
+            {
+
+                fresh = new Pointer( goodOne.start, goodOne.start + size - 1 );
+                fresh.bufferNumber = getBufferNumber();
+                fresh.free = true;
+                fresh.created = System.currentTimeMillis();
+
+                // create a new pointer for the remaining space 
+                final Pointer newGoodOne = new Pointer( fresh.end + 1, goodOne.end );
+                newGoodOne.free = true;
+
+                // and add it to the free lists
+                freePointersByMemoryOffsetAsc.put( newGoodOne, DEFAULT_VALUE );
+                freePointersBySizeDesc.put( newGoodOne, DEFAULT_VALUE );
+            }
+
+            usedPointers.add( fresh );
+
+        }
+        finally
+        {
+            pointerManipulationLock.unlock();
+        }
+
+        return fresh;
+
+    }
+
+    @Override
+    public Pointer allocate( int size, long expiresIn, long expires )
+    {
+
+        final Pointer allocatedPointer = allocatePointer( size );
+
+        if ( allocatedPointer != null )
+        {
+            setExpiration( allocatedPointer, expiresIn, expires );
+
+            allocatedPointer.free = false;
+
+            final ByteBuffer buf = buffer.duplicate();
+            buf.position( allocatedPointer.start );
+            buf.limit( allocatedPointer.start + size );
+            allocatedPointer.directBuffer = buf.slice();
+
+            used.addAndGet( size );
+        }
+
+        return allocatedPointer;
+    }
+
+    /**
+     * Sort {@link Pointer}s by size desc.
+     */
+    private static class PointerBySizeDesc
+        implements Comparator<Pointer>
+    {
+
+        @Override
+        public int compare( final Pointer pointer0, final Pointer pointer1 )
+        {
+            final int size0 = pointer0.getCapacity();
+            final int size1 = pointer1.getCapacity();
+
+            if ( size0 > size1 )
+            {
+                return -1;
+            }
+            else
+            {
+                if ( size0 == size1 )
+                {
+                    return 0;
+                }
+                else
+                {
+                    return 1;
+                }
+            }
+        }
+    }
+
+    /**
+     * Sort {@link Pointer}s by memory offset asc.
+     */
+    private static class PointerByMemoryOffsetAsc
+        implements Comparator<Pointer>
+    {
+
+        @Override
+        public int compare( final Pointer pointer0, final Pointer pointer1 )
+        {
+            final int offset0 = pointer0.start;
+            final int offset1 = pointer1.start;
+
+            if ( offset0 < offset1 )
+            {
+                return -1;
+            }
+            else
+            {
+                if ( offset0 == offset1 )
+                {
+                    return 0;
+                }
+                else
+                {
+                    return 1;
+                }
+            }
+        }
+    }
+
+    @Override
+    protected List<Pointer> getUsedPointers()
+    {
+        return new ArrayList<Pointer>( usedPointers );
+    }
+
+    @Override
+    public Pointer update( Pointer pointer, byte[] payload )
+    {
+        if ( payload.length > pointer.getCapacity() )
+        {
+            throw new BufferOverflowException();
+        }
+        // Create an independent view of the buffer
+        final ByteBuffer buf = buffer.duplicate();
+        // Set it at the right start offset
+        buf.position( pointer.start );
+        // Write the content in the shared buffer
+        buf.put( payload );
+
+        return pointer;
+    }
+
+    public List<Pointer> getPointers()
+    {
+        // TODO : remove this conversion from Set to List ...
+        return new ArrayList<Pointer>( usedPointers );
+    }
+}

Added: incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/AbstractOffHeapMemoryBufferTest.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/AbstractOffHeapMemoryBufferTest.java?rev=1228170&view=auto
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/AbstractOffHeapMemoryBufferTest.java (added)
+++ incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/AbstractOffHeapMemoryBufferTest.java Fri Jan  6 13:33:16 2012
@@ -0,0 +1,436 @@
+package org.apache.directmemory.memory.test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.apache.directmemory.memory.MemoryManagerService;
+import org.apache.directmemory.memory.OffHeapMergingMemoryBufferImpl;
+import org.apache.directmemory.memory.OffHeapMemoryBuffer;
+import org.apache.directmemory.memory.Pointer;
+import org.junit.Test;
+
+public abstract class AbstractOffHeapMemoryBufferTest
+{
+
+    protected static final Random R = new Random();
+
+    protected static final int SMALL_PAYLOAD_LENGTH = 4;
+    protected static final byte[] SMALL_PAYLOAD = MemoryTestUtils.generateRandomPayload( SMALL_PAYLOAD_LENGTH );
+
+    
+    protected abstract OffHeapMemoryBuffer instanciateOffHeapMemoryBuffer( int bufferSize );
+    
+    /**
+     * Test pointers allocation, when buffer size is not aligned with the size of stored objects.
+     * Null {@link Pointer} should be returned to allow {@link MemoryManagerService} to go to next step with allocation policy.
+     */
+    @Test
+    public void testNotEnoughFreeSpace()
+    {
+
+        // Storing a first payload of 4 bytes, 1 byte remaining in the buffer. When storing a second 4 bytes payload, an null pointer should be returned.
+
+        final int BUFFER_SIZE = SMALL_PAYLOAD_LENGTH + 1;
+
+        final OffHeapMemoryBuffer offHeapMemoryBuffer = instanciateOffHeapMemoryBuffer( BUFFER_SIZE );
+
+        Pointer pointer1 = offHeapMemoryBuffer.store( SMALL_PAYLOAD );
+        Assert.assertNotNull( pointer1 );
+        Assert.assertFalse( pointer1.free );
+        Assert.assertNull( pointer1.directBuffer );
+
+        Pointer pointer2 = offHeapMemoryBuffer.store( SMALL_PAYLOAD );
+        Assert.assertNull( pointer2 );
+
+    }
+    
+
+    /**
+     * Ensure no byte is leaking when allocating several objects.
+     */
+    @Test
+    public void testByteLeaking()
+    {
+
+        // Initializing 1 buffer of 10*4 bytes, should be able to allocate 10 objects of 4 bytes.
+
+        final int NUMBER_OF_OBJECTS = 10;
+        final int BUFFER_SIZE = NUMBER_OF_OBJECTS * SMALL_PAYLOAD_LENGTH;
+        
+        final OffHeapMemoryBuffer offHeapMemoryBuffer = instanciateOffHeapMemoryBuffer( BUFFER_SIZE );
+
+        for ( int i = 0; i < NUMBER_OF_OBJECTS; i++ )
+        {
+            Pointer pointer = offHeapMemoryBuffer.store( SMALL_PAYLOAD );
+            Assert.assertNotNull( pointer );
+        }
+
+        Pointer pointerNull = offHeapMemoryBuffer.store( SMALL_PAYLOAD );
+        Assert.assertNull( pointerNull );
+    }
+
+    /**
+     * Ensure memory usage is reported correctly
+     */
+    @Test
+    public void testReportCorrectUsedMemory()
+    {
+
+        // Initializing 1 buffer of 4*4 bytes, storing and freeing and storing again should report correct numbers.
+
+        final int NUMBER_OF_OBJECTS = 4;
+        final int BUFFER_SIZE = NUMBER_OF_OBJECTS * SMALL_PAYLOAD_LENGTH;
+
+        final OffHeapMemoryBuffer offHeapMemoryBuffer = instanciateOffHeapMemoryBuffer( BUFFER_SIZE );
+
+        Pointer lastPointer = null;
+        for ( int i = 0; i < NUMBER_OF_OBJECTS; i++ )
+        {
+            Pointer pointer = offHeapMemoryBuffer.store( SMALL_PAYLOAD );
+            Assert.assertNotNull( pointer );
+            lastPointer = pointer;
+        }
+
+        // Buffer is fully used.
+        Assert.assertEquals( BUFFER_SIZE, offHeapMemoryBuffer.used() );
+
+        Assert.assertNotNull( lastPointer );
+        offHeapMemoryBuffer.free( lastPointer );
+
+        Pointer pointerNotNull = offHeapMemoryBuffer.store( SMALL_PAYLOAD );
+        Assert.assertNotNull( pointerNotNull );
+
+        // Buffer again fully used.
+        Assert.assertEquals( BUFFER_SIZE, offHeapMemoryBuffer.used() );
+
+    }
+
+    /**
+     * Completely fill the buffer, free some pointer, reallocated the freed space, clear the buffer. The entire space should be  
+     */
+    @Test
+    public void testFullFillAndFreeAndClearBuffer()
+    {
+
+        final int NUMBER_OF_OBJECTS = 10;
+        final int BUFFER_SIZE = NUMBER_OF_OBJECTS * SMALL_PAYLOAD_LENGTH;
+
+        final OffHeapMemoryBuffer offHeapMemoryBuffer = instanciateOffHeapMemoryBuffer( BUFFER_SIZE );
+
+        Pointer pointerFull = offHeapMemoryBuffer.store( MemoryTestUtils.generateRandomPayload( BUFFER_SIZE ) );
+        Assert.assertNotNull( pointerFull );
+        offHeapMemoryBuffer.free( pointerFull );
+
+        final int size1 = R.nextInt( BUFFER_SIZE / 2 ) + 1;
+        Pointer pointer1 = offHeapMemoryBuffer.store( MemoryTestUtils.generateRandomPayload( size1 ) );
+        Assert.assertNotNull( "Cannot store " + size1 + " bytes", pointer1 );
+
+        final int size2 = R.nextInt( ( BUFFER_SIZE - size1 ) / 2 ) + 1;
+        Pointer pointer2 = offHeapMemoryBuffer.store( MemoryTestUtils.generateRandomPayload( size2 ) );
+        Assert.assertNotNull( "Cannot store " + size2 + " bytes", pointer2 );
+
+        final int size3 = R.nextInt( ( BUFFER_SIZE - size1 - size2 ) / 2 ) + 1;
+        Pointer pointer3 = offHeapMemoryBuffer.store( MemoryTestUtils.generateRandomPayload( size3 ) );
+        Assert.assertNotNull( "Cannot store " + size3 + " bytes", pointer3 );
+
+        final int size4 = BUFFER_SIZE - size1 - size2 - size3;
+        Pointer pointer4 = offHeapMemoryBuffer.store( MemoryTestUtils.generateRandomPayload( size4 ) );
+        Assert.assertNotNull( "Cannot store " + size4 + " bytes", pointer4 );
+
+        offHeapMemoryBuffer.free( pointer1 );
+        Assert.assertTrue( pointer1.free );
+        
+        offHeapMemoryBuffer.free( pointer3 );
+
+        offHeapMemoryBuffer.free( pointer4 );
+
+        offHeapMemoryBuffer.free( pointer2 );
+
+        Assert.assertEquals( 0, offHeapMemoryBuffer.used() );
+
+        // As all pointers have been freeed, we should be able to reallocate the whole buffer
+        Pointer pointer6 = offHeapMemoryBuffer.store( MemoryTestUtils.generateRandomPayload( BUFFER_SIZE ) );
+        Assert.assertNotNull( "Cannot store " + BUFFER_SIZE + " bytes", pointer6 );
+
+        offHeapMemoryBuffer.clear();
+
+        // As all pointers have been cleared, we should be able to reallocate the whole buffer
+        Pointer pointer7 = offHeapMemoryBuffer.store( MemoryTestUtils.generateRandomPayload( BUFFER_SIZE ) );
+        Assert.assertNotNull( "Cannot store " + BUFFER_SIZE + " bytes", pointer7 );
+
+        offHeapMemoryBuffer.clear();
+
+        // As all pointers have been cleared, we should be able to reallocate the whole buffer
+        for ( int i = 0; i < NUMBER_OF_OBJECTS; i++ )
+        {
+            Pointer pointer = offHeapMemoryBuffer.store( SMALL_PAYLOAD );
+            Assert.assertNotNull( pointer );
+        }
+
+        offHeapMemoryBuffer.clear();
+
+        // As all pointers have been cleared, we should be able to reallocate the whole buffer
+        Pointer pointer8 = offHeapMemoryBuffer.store( MemoryTestUtils.generateRandomPayload( BUFFER_SIZE ) );
+        Assert.assertNotNull( "Cannot store " + BUFFER_SIZE + " bytes", pointer8 );
+
+        offHeapMemoryBuffer.free( pointer8 );
+
+        // As all pointers have been cleared, we should be able to reallocate the whole buffer
+        for ( int i = 0; i < NUMBER_OF_OBJECTS * 10; i++ )
+        {
+            Pointer pointer = offHeapMemoryBuffer.store( SMALL_PAYLOAD );
+            Assert.assertNotNull( pointer );
+            offHeapMemoryBuffer.free( pointer );
+        }
+
+        // After a clear occurs, pointers allocated before the clear should be set as "free"
+        Assert.assertTrue( pointer6.free );
+        Assert.assertTrue( pointer7.free );
+        
+    }
+
+    @Test
+    public void testRandomPayload()
+    {
+
+        final int NUMBER_OF_OBJECTS = 10;
+        final int BUFFER_SIZE = NUMBER_OF_OBJECTS * SMALL_PAYLOAD_LENGTH;
+
+        final OffHeapMemoryBuffer offHeapMemoryBuffer = instanciateOffHeapMemoryBuffer( BUFFER_SIZE );
+
+        for ( int i = 0; i < NUMBER_OF_OBJECTS; i++ )
+        {
+            byte[] payload = MemoryTestUtils.generateRandomPayload( SMALL_PAYLOAD_LENGTH );
+            Pointer pointer = offHeapMemoryBuffer.store( payload );
+            Assert.assertNotNull( pointer );
+            byte[] fetchedPayload = offHeapMemoryBuffer.retrieve( pointer );
+            Assert.assertEquals( new String( payload ), new String( fetchedPayload ) );
+            if ( R.nextBoolean() )
+            {
+                offHeapMemoryBuffer.free( pointer );
+            }
+        }
+
+        offHeapMemoryBuffer.clear();
+
+        for ( int i = 0; i < NUMBER_OF_OBJECTS; i++ )
+        {
+            byte[] payload = MemoryTestUtils.generateRandomPayload( SMALL_PAYLOAD_LENGTH );
+            Pointer pointer = offHeapMemoryBuffer.store( payload );
+            Assert.assertNotNull( pointer );
+            byte[] fetchedPayload = offHeapMemoryBuffer.retrieve( pointer );
+            Assert.assertEquals( new String( payload ), new String( fetchedPayload ) );
+            if ( R.nextBoolean() )
+            {
+                offHeapMemoryBuffer.free( pointer );
+                i--;
+            }
+        }
+
+    }
+
+    @Test
+    public void testStoreAllocAndFree()
+    {
+
+        final int NUMBER_OF_OBJECTS = 100;
+        final int BUFFER_SIZE = NUMBER_OF_OBJECTS * SMALL_PAYLOAD_LENGTH;
+
+        final OffHeapMemoryBuffer offHeapMemoryBuffer = instanciateOffHeapMemoryBuffer( BUFFER_SIZE );
+
+        List<Pointer> pointers = new ArrayList<Pointer>( NUMBER_OF_OBJECTS );
+        for ( int i = 0; i < NUMBER_OF_OBJECTS; i++ )
+        {
+            byte[] payload = MemoryTestUtils.generateRandomPayload( SMALL_PAYLOAD_LENGTH );
+            Pointer pointer = offHeapMemoryBuffer.store( payload );
+            Assert.assertNotNull( pointer );
+            pointers.add( pointer );
+            byte[] fetchedPayload = offHeapMemoryBuffer.retrieve( pointer );
+            Assert.assertEquals( new String( payload ), new String( fetchedPayload ) );
+        }
+
+        // Free 1/4 of the pointers, from 1/4 of the address space to 1/2
+        for ( int i = NUMBER_OF_OBJECTS / 4; i < NUMBER_OF_OBJECTS / 2; i++ )
+        {
+            Pointer pointer = pointers.get( i );
+            offHeapMemoryBuffer.free( pointer );
+        }
+
+        // Should be able to allocate NUMBER_OF_OBJECTS / 4 * SMALL_PAYLOAD_LENGTH bytes
+        Pointer pointer1 = offHeapMemoryBuffer.allocate( NUMBER_OF_OBJECTS / 4 * SMALL_PAYLOAD_LENGTH, 0, 0 );
+        Assert.assertNotNull( pointer1 );
+
+        int pointerToSkip = NUMBER_OF_OBJECTS / 2 + NUMBER_OF_OBJECTS / 10;
+        for ( int i = NUMBER_OF_OBJECTS / 2; i < NUMBER_OF_OBJECTS * 3 / 4; i++ )
+        {
+            // skip one pointer 
+            if ( i == pointerToSkip )
+            {
+                continue;
+            }
+            Pointer pointer = pointers.get( i );
+            offHeapMemoryBuffer.free( pointer );
+        }
+
+        // Should NOT be able to allocate NUMBER_OF_OBJECTS / 4 * SMALL_PAYLOAD_LENGTH bytes
+        Pointer pointer2 = offHeapMemoryBuffer.allocate( NUMBER_OF_OBJECTS / 4 * SMALL_PAYLOAD_LENGTH, 0, 0 );
+        Assert.assertNull( pointer2 );
+
+        // Freeing the previously skipped pointer should then merge the whole memory space
+        offHeapMemoryBuffer.free( pointers.get( pointerToSkip ) );
+
+        // Should be able to allocate NUMBER_OF_OBJECTS / 4 * SMALL_PAYLOAD_LENGTH bytes
+        Pointer pointer3 = offHeapMemoryBuffer.allocate( NUMBER_OF_OBJECTS / 4 * SMALL_PAYLOAD_LENGTH, 0, 0 );
+        Assert.assertNotNull( pointer3 );
+
+        byte[] payload3 = MemoryTestUtils.generateRandomPayload( NUMBER_OF_OBJECTS / 4 * SMALL_PAYLOAD_LENGTH );
+        pointer3.directBuffer.put( payload3 );
+        byte[] retrievePayload3 = offHeapMemoryBuffer.retrieve( pointer3 );
+        Assert.assertEquals( new String( payload3 ), new String( retrievePayload3 ) );
+
+    }
+
+    @Test
+    public void testRandomPayload2()
+    {
+
+        final int NUMBER_OF_OBJECTS = 10;
+        final int BUFFER_SIZE = NUMBER_OF_OBJECTS * SMALL_PAYLOAD_LENGTH;
+
+        final OffHeapMergingMemoryBufferImpl offHeapLinkedMemoryBuffer = OffHeapMergingMemoryBufferImpl
+            .createNew( BUFFER_SIZE );
+
+        byte[] payload1 = MemoryTestUtils.generateRandomPayload( 2 * SMALL_PAYLOAD_LENGTH );
+        Pointer pointer1 = offHeapLinkedMemoryBuffer.store( payload1 );
+        Assert.assertNotNull( pointer1 );
+
+        byte[] fetchedPayload1 = offHeapLinkedMemoryBuffer.retrieve( pointer1 );
+        Assert.assertEquals( new String( payload1 ), new String( fetchedPayload1 ) );
+
+        byte[] payload2 = MemoryTestUtils.generateRandomPayload( SMALL_PAYLOAD_LENGTH );
+        Pointer pointer2 = offHeapLinkedMemoryBuffer.store( payload2 );
+        Assert.assertNotNull( pointer2 );
+
+        byte[] fetchedPayload2 = offHeapLinkedMemoryBuffer.retrieve( pointer2 );
+        Assert.assertEquals( new String( payload2 ), new String( fetchedPayload2 ) );
+
+        offHeapLinkedMemoryBuffer.free( pointer1 );
+
+        offHeapLinkedMemoryBuffer.free( pointer1 );
+
+        byte[] payload3 = MemoryTestUtils.generateRandomPayload( SMALL_PAYLOAD_LENGTH );
+        Pointer pointer3 = offHeapLinkedMemoryBuffer.store( payload3 );
+        Assert.assertNotNull( pointer3 );
+
+        byte[] fetchedPayload3 = offHeapLinkedMemoryBuffer.retrieve( pointer3 );
+        Assert.assertEquals( new String( payload3 ), new String( fetchedPayload3 ) );
+
+    }
+
+    @Test
+    public void testUpdate()
+    {
+
+        final int NUMBER_OF_OBJECTS = 1;
+        final int BUFFER_SIZE = NUMBER_OF_OBJECTS * SMALL_PAYLOAD_LENGTH;
+
+        final OffHeapMemoryBuffer offHeapMemoryBuffer = instanciateOffHeapMemoryBuffer( BUFFER_SIZE );
+
+        final byte[] payload = MemoryTestUtils.generateRandomPayload( SMALL_PAYLOAD_LENGTH );
+
+        final Pointer pointer = offHeapMemoryBuffer.store( payload );
+        Assert.assertNotNull( pointer );
+        Assert.assertEquals( new String( payload ), new String( offHeapMemoryBuffer.retrieve( pointer ) ) );
+
+        final byte[] otherPayload = MemoryTestUtils.generateRandomPayload( SMALL_PAYLOAD_LENGTH );
+        final Pointer otherPointer = offHeapMemoryBuffer.update( pointer, otherPayload );
+        Assert.assertNotNull( otherPointer );
+        Assert.assertEquals( pointer.start, otherPointer.start );
+        Assert.assertEquals( pointer.end, otherPointer.end );
+        Assert.assertEquals( new String( otherPayload ), new String( offHeapMemoryBuffer.retrieve( otherPointer ) ) );
+
+        final byte[] evenAnotherPayload = MemoryTestUtils.generateRandomPayload( SMALL_PAYLOAD_LENGTH / 2 );
+        final Pointer evenAnotherPointer = offHeapMemoryBuffer.update( pointer, evenAnotherPayload );
+        Assert.assertNotNull( evenAnotherPointer );
+        Assert.assertEquals( pointer.start, evenAnotherPointer.start );
+        Assert.assertEquals( pointer.end, evenAnotherPointer.end );
+        Assert.assertEquals( 4, new String( offHeapMemoryBuffer.retrieve( evenAnotherPointer ) ).length() );
+        Assert.assertTrue( new String( offHeapMemoryBuffer.retrieve( evenAnotherPointer ) )
+            .startsWith( new String( evenAnotherPayload ) ) );
+
+    }
+    
+    
+    @Test
+    public void testAllocate()
+    {
+
+        final int NUMBER_OF_OBJECTS = 10;
+        final int BUFFER_SIZE = NUMBER_OF_OBJECTS * SMALL_PAYLOAD_LENGTH;
+
+        final OffHeapMemoryBuffer offHeapMemoryBuffer = instanciateOffHeapMemoryBuffer( BUFFER_SIZE );
+
+        final byte[] payload1 = MemoryTestUtils.generateRandomPayload( 8 * SMALL_PAYLOAD_LENGTH );
+        final Pointer pointer1 = offHeapMemoryBuffer.store( payload1 );
+        Assert.assertNotNull( pointer1 );
+        Assert.assertEquals( new String( payload1 ), new String( offHeapMemoryBuffer.retrieve( pointer1 ) ) );
+
+        final byte[] payload2 = MemoryTestUtils.generateRandomPayload( 2 * SMALL_PAYLOAD_LENGTH );
+        final Pointer pointer2 = offHeapMemoryBuffer.store( payload2 );
+        Assert.assertNotNull( pointer2 );
+        Assert.assertEquals( new String( payload2 ), new String( offHeapMemoryBuffer.retrieve( pointer2 ) ) );
+
+        offHeapMemoryBuffer.free( pointer1 );
+        
+        final byte[] payload3 = MemoryTestUtils.generateRandomPayload( 2 * SMALL_PAYLOAD_LENGTH );
+        final Pointer pointer3 = offHeapMemoryBuffer.store( payload3 );
+        Assert.assertNotNull( pointer3 );
+        Assert.assertEquals( new String( payload3 ), new String( offHeapMemoryBuffer.retrieve( pointer3 ) ) );
+
+        final byte[] allocatedPayload1 = MemoryTestUtils.generateRandomPayload( 4 * SMALL_PAYLOAD_LENGTH );
+        final Pointer allocatedPointer1 = offHeapMemoryBuffer.allocate( allocatedPayload1.length, -1, -1 );
+        Assert.assertNotNull( allocatedPointer1 );
+        final ByteBuffer buffer1 = allocatedPointer1.directBuffer;
+        Assert.assertNotNull( buffer1 );
+        buffer1.put( allocatedPayload1 );
+        Assert.assertEquals( new String( allocatedPayload1 ), new String( offHeapMemoryBuffer.retrieve( allocatedPointer1 ) ) );
+
+        final byte[] allocatedPayload2 = MemoryTestUtils.generateRandomPayload( 2 * SMALL_PAYLOAD_LENGTH );
+        final Pointer allocatedPointer2 = offHeapMemoryBuffer.allocate( allocatedPayload2.length, -1, -1 );
+        Assert.assertNotNull( allocatedPointer2 );
+        final ByteBuffer buffer2 = allocatedPointer2.directBuffer;
+        Assert.assertNotNull( buffer2 );
+        buffer2.put( allocatedPayload2 );
+        Assert.assertEquals( new String( allocatedPayload2 ), new String( offHeapMemoryBuffer.retrieve( allocatedPointer2 ) ) );
+
+        
+        // Ensure the new allocation has not overwritten other data
+        Assert.assertEquals( new String( payload2 ), new String( offHeapMemoryBuffer.retrieve( pointer2 ) ) );
+        Assert.assertEquals( new String( payload3 ), new String( offHeapMemoryBuffer.retrieve( pointer3 ) ) );
+
+    }
+    
+}

Added: incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/OffHeapMemoryBufferTest.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/OffHeapMemoryBufferTest.java?rev=1228170&view=auto
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/OffHeapMemoryBufferTest.java (added)
+++ incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/OffHeapMemoryBufferTest.java Fri Jan  6 13:33:16 2012
@@ -0,0 +1,52 @@
+package org.apache.directmemory.memory.test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.directmemory.memory.OffHeapMemoryBuffer;
+import org.apache.directmemory.memory.OffHeapMemoryBufferImpl;
+import org.junit.Test;
+
+public class OffHeapMemoryBufferTest extends AbstractOffHeapMemoryBufferTest
+{
+
+    protected OffHeapMemoryBuffer instanciateOffHeapMemoryBuffer( int bufferSize ) 
+    {
+        return OffHeapMemoryBufferImpl.createNew( bufferSize );
+    }
+
+    @Test
+    public void testFullFillAndFreeAndClearBuffer()
+    {
+        // DIRECTMEMORY-40 : Pointers merging with adjacent free pointers when freeing.
+    }
+    
+    @Test
+    public void testStoreAllocAndFree()
+    {
+        // DIRECTMEMORY-40 : Pointers merging with adjacent free pointers when freeing.
+    }
+    
+    @Test
+    public void testUpdate()
+    {
+        // DIRECTMEMORY-49 : OffHeapMemoryBufferImpl.update does not reuse the same pointer
+    }
+}

Added: incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/OffHeapMergingMemoryBufferTest.java
URL: http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/OffHeapMergingMemoryBufferTest.java?rev=1228170&view=auto
==============================================================================
--- incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/OffHeapMergingMemoryBufferTest.java (added)
+++ incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/OffHeapMergingMemoryBufferTest.java Fri Jan  6 13:33:16 2012
@@ -0,0 +1,34 @@
+package org.apache.directmemory.memory.test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.directmemory.memory.OffHeapMergingMemoryBufferImpl;
+import org.apache.directmemory.memory.OffHeapMemoryBuffer;
+
+public class OffHeapMergingMemoryBufferTest extends AbstractOffHeapMemoryBufferTest
+{
+
+    protected OffHeapMemoryBuffer instanciateOffHeapMemoryBuffer( int bufferSize ) 
+    {
+        return OffHeapMergingMemoryBufferImpl.createNew( bufferSize );
+    }
+
+}