You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2015/11/20 02:07:55 UTC

[20/20] incubator-geode git commit: move FreeListManager from SimpleMemoryAllocatorImpl

move FreeListManager from SimpleMemoryAllocatorImpl


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/3e7da937
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/3e7da937
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/3e7da937

Branch: refs/heads/feature/GEODE-580
Commit: 3e7da9371d61d23c4f785d5e721f2c4b05c47514
Parents: 18f5e9d
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Thu Nov 19 17:06:56 2015 -0800
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Thu Nov 19 17:06:56 2015 -0800

----------------------------------------------------------------------
 .../internal/offheap/FreeListManager.java       | 679 +++++++++++++++++++
 .../offheap/SimpleMemoryAllocatorImpl.java      | 661 +-----------------
 2 files changed, 685 insertions(+), 655 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3e7da937/gemfire-core/src/main/java/com/gemstone/gemfire/internal/offheap/FreeListManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/offheap/FreeListManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/offheap/FreeListManager.java
new file mode 100644
index 0000000..f335b4b
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/offheap/FreeListManager.java
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.offheap;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.OutOfOffHeapMemoryException;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+
+/**
+ * Manages the free lists for a SimpleMemoryAllocatorImpl
+ */
+public class FreeListManager {
+    final AtomicReferenceArray<SyncChunkStack> tinyFreeLists = new AtomicReferenceArray<SyncChunkStack>(SimpleMemoryAllocatorImpl.TINY_FREE_LIST_COUNT);
+    // hugeChunkSet is sorted by chunk size in ascending order. It will only contain chunks larger than MAX_TINY.
+    final ConcurrentSkipListSet<Chunk> hugeChunkSet = new ConcurrentSkipListSet<Chunk>();
+    private final AtomicLong allocatedSize = new AtomicLong(0L);
+   
+    private int getNearestTinyMultiple(int size) {
+      return (size-1)/SimpleMemoryAllocatorImpl.TINY_MULTIPLE;
+    }
+    public List<Chunk> getLiveChunks() {
+      ArrayList<Chunk> result = new ArrayList<Chunk>();
+      UnsafeMemoryChunk[] slabs = this.ma.getSlabs();
+      for (int i=0; i < slabs.length; i++) {
+        getLiveChunks(slabs[i], result);
+      }
+      return result;
+    }
+    private void getLiveChunks(UnsafeMemoryChunk slab, List<Chunk> result) {
+      long addr = slab.getMemoryAddress();
+      while (addr <= (slab.getMemoryAddress() + slab.getSize() - Chunk.MIN_CHUNK_SIZE)) {
+        Fragment f = isAddrInFragmentFreeSpace(addr);
+        if (f != null) {
+          addr = f.getMemoryAddress() + f.getSize();
+        } else {
+          int curChunkSize = Chunk.getSize(addr);
+          int refCount = Chunk.getRefCount(addr);
+          if (refCount > 0) {
+            result.add(this.ma.chunkFactory.newChunk(addr));
+          }
+          addr += curChunkSize;
+        }
+      }
+    }
+    /**
+     * If addr is in the free space of a fragment then return that fragment; otherwise return null.
+     */
+    private Fragment isAddrInFragmentFreeSpace(long addr) {
+      for (Fragment f: this.fragmentList) {
+        if (addr >= (f.getMemoryAddress() + f.getFreeIndex()) && addr < (f.getMemoryAddress() + f.getSize())) {
+          return f;
+        }
+      }
+      return null;
+    }
+    public long getUsedMemory() {
+      return this.allocatedSize.get();
+    }
+    public long getFreeMemory() {
+      return this.ma.getTotalMemory() - getUsedMemory();
+    }
+    public long getFreeFragmentMemory() {
+      long result = 0;
+      for (Fragment f: this.fragmentList) {
+        int freeSpace = f.freeSpace();
+        if (freeSpace >= Chunk.MIN_CHUNK_SIZE) {
+          result += freeSpace;
+        }
+      }
+      return result;
+    }
+    public long getFreeTinyMemory() {
+      long tinyFree = 0;
+      for (int i=0; i < this.tinyFreeLists.length(); i++) {
+        SyncChunkStack cl = this.tinyFreeLists.get(i);
+        if (cl != null) {
+          tinyFree += cl.computeTotalSize();
+        }
+      }
+      return tinyFree;
+    }
+    public long getFreeHugeMemory() {
+      long hugeFree = 0;
+      for (Chunk c: this.hugeChunkSet) {
+        hugeFree += c.getSize();
+      }
+      return hugeFree;
+    }
+
+    /**
+     * The id of the last fragment we allocated from.
+     */
+    private final AtomicInteger lastFragmentAllocation = new AtomicInteger(0);
+    final CopyOnWriteArrayList<Fragment> fragmentList;
+    private final SimpleMemoryAllocatorImpl ma;
+    
+    public FreeListManager(SimpleMemoryAllocatorImpl ma) {
+      this.ma = ma;
+      UnsafeMemoryChunk[] slabs = ma.getSlabs();
+      Fragment[] tmp = new Fragment[slabs.length];
+      for (int i=0; i < slabs.length; i++) {
+        tmp[i] = new Fragment(slabs[i].getMemoryAddress(), slabs[i].getSize());
+      }
+      this.fragmentList = new CopyOnWriteArrayList<Fragment>(tmp);
+      
+      if(ma.validateMemoryWithFill) {
+        fillFragments();
+      }
+    }
+    
+    /**
+     * Fills all fragments with a fill used for data integrity validation.
+     */
+    private void fillFragments() {
+      for(Fragment fragment : this.fragmentList) {
+        fragment.fill();
+      }
+    }
+    
+    /**
+     * Allocate a chunk of memory of at least the given size.
+     * The basic algorithm is:
+     * 1. Look for a previously allocated and freed chunk close to the size requested.
+     * 2. See if the original chunk is big enough to split. If so do so.
+     * 3. Look for a previously allocated and freed chunk of any size larger than the one requested.
+     *    If we find one split it.
+     * <p>
+     * It might be better not to include step 3 since we expect and freed chunk to be reallocated in the future.
+     * Maybe it would be better for 3 to look for adjacent free blocks that can be merged together.
+     * For now we will just try 1 and 2 and then report out of mem.
+     * @param size minimum bytes the returned chunk must have.
+     * @param chunkType TODO
+     * @return the allocated chunk
+     * @throws IllegalStateException if a chunk can not be allocated.
+     */
+    @SuppressWarnings("synthetic-access")
+    public Chunk allocate(int size, ChunkType chunkType) {
+      Chunk result = null;
+      {
+        assert size > 0;
+        if (chunkType == null) {
+          chunkType = GemFireChunk.TYPE;
+        }
+        result = basicAllocate(size, true, chunkType);
+        result.setDataSize(size);
+      }
+      this.ma.stats.incObjects(1);
+      int resultSize = result.getSize();
+      this.allocatedSize.addAndGet(resultSize);
+      this.ma.stats.incUsedMemory(resultSize);
+      this.ma.stats.incFreeMemory(-resultSize);
+      result.initializeUseCount();
+      this.ma.notifyListeners();
+      
+      return result;
+    }
+    
+    private Chunk basicAllocate(int size, boolean useSlabs, ChunkType chunkType) {
+      if (useSlabs) {
+        // Every object stored off heap has a header so we need
+        // to adjust the size so that the header gets allocated.
+        // If useSlabs is false then the incoming size has already
+        // been adjusted.
+        size += Chunk.OFF_HEAP_HEADER_SIZE;
+      }
+      if (size <= SimpleMemoryAllocatorImpl.MAX_TINY) {
+        return allocateTiny(size, useSlabs, chunkType);
+      } else {
+        return allocateHuge(size, useSlabs, chunkType);
+      }
+    }
+    
+    private Chunk allocateFromFragments(int chunkSize, ChunkType chunkType) {
+      do {
+        final int lastAllocationId = this.lastFragmentAllocation.get();
+        for (int i=lastAllocationId; i < this.fragmentList.size(); i++) {
+          Chunk result = allocateFromFragment(i, chunkSize, chunkType);
+          if (result != null) {
+            return result;
+          }
+        }
+        for (int i=0; i < lastAllocationId; i++) {
+          Chunk result = allocateFromFragment(i, chunkSize, chunkType);
+          if (result != null) {
+            return result;
+          }
+        }
+      } while (compact(chunkSize));
+      // We tried all the fragments and didn't find any free memory.
+      logOffHeapState(chunkSize);
+      final OutOfOffHeapMemoryException failure = new OutOfOffHeapMemoryException("Out of off-heap memory. Could not allocate size of " + chunkSize);
+      try {
+        throw failure;
+      } finally {
+        this.ma.ooohml.outOfOffHeapMemory(failure);
+      }
+    }
+    
+    private void logOffHeapState(int chunkSize) {
+      if (InternalDistributedSystem.getAnyInstance() != null) {
+        LogWriter lw = InternalDistributedSystem.getAnyInstance().getLogWriter();
+        lw.info("OutOfOffHeapMemory allocating size of " + chunkSize + ". allocated=" + this.allocatedSize.get() + " compactions=" + this.compactCount.get() + " objects=" + this.ma.stats.getObjects() + " free=" + this.ma.stats.getFreeMemory() + " fragments=" + this.ma.stats.getFragments() + " largestFragment=" + this.ma.stats.getLargestFragment() + " fragmentation=" + this.ma.stats.getFragmentation());
+        logFragmentState(lw);
+        logTinyState(lw);
+//        logBigState(lw);
+        logHugeState(lw);
+      }
+    }
+
+    private void logHugeState(LogWriter lw) {
+      for (Chunk c: this.hugeChunkSet) {
+        lw.info("Free huge of size " + c.getSize());
+      }
+    }
+//    private void logBigState(LogWriter lw) {
+//      for (int i=0; i < this.bigFreeLists.length(); i++) {
+//        ConcurrentChunkStack cl = this.bigFreeLists.get(i);
+//        if (cl != null) {
+//          cl.logSizes(lw, "Free big of size ");
+//        }
+//      }
+//    }
+    private void logTinyState(LogWriter lw) {
+      for (int i=0; i < this.tinyFreeLists.length(); i++) {
+        SyncChunkStack cl = this.tinyFreeLists.get(i);
+        if (cl != null) {
+          cl.logSizes(lw, "Free tiny of size ");
+        }
+      }
+    }
+    private void logFragmentState(LogWriter lw) {
+      for (Fragment f: this.fragmentList) {
+        int freeSpace = f.freeSpace();
+        if (freeSpace > 0) {
+          lw.info("Fragment at " + f.getMemoryAddress() + " of size " + f.getSize() + " has " + freeSpace + " bytes free.");
+        }
+      }
+    }
+
+    private final AtomicInteger compactCount = new AtomicInteger();
+    /**
+     * Compacts memory and returns true if enough memory to allocate chunkSize
+     * is freed. Otherwise returns false;
+     * TODO OFFHEAP: what should be done about contiguous chunks that end up being bigger than 2G?
+     * Currently if we are given slabs bigger than 2G or that just happen to be contiguous and add
+     * up to 2G then the compactor may unify them together into a single Chunk and our 32-bit chunkSize
+     * field will overflow. This code needs to detect this and just create a chunk of 2G and then start
+     * a new one.
+     * Or to prevent it from happening we could just check the incoming slabs and throw away a few bytes
+     * to keep them from being contiguous.
+     */
+    boolean compact(int chunkSize) {
+      final long startCompactionTime = this.ma.getStats().startCompaction();
+      final int countPreSync = this.compactCount.get();
+      try {
+        synchronized (this) {
+          if (this.compactCount.get() != countPreSync) {
+            // someone else did a compaction while we waited on the sync.
+            // So just return true causing the caller to retry the allocation.
+            return true;
+          }
+          ArrayList<SyncChunkStack> freeChunks = new ArrayList<SyncChunkStack>();
+          collectFreeChunks(freeChunks);
+          final int SORT_ARRAY_BLOCK_SIZE = 128;
+          long[] sorted = new long[SORT_ARRAY_BLOCK_SIZE];
+          int sortedSize = 0;
+          boolean result = false;
+          int largestFragment = 0;
+          for (SyncChunkStack l: freeChunks) {
+            long addr = l.poll();
+            while (addr != 0) {
+              int idx = Arrays.binarySearch(sorted, 0, sortedSize, addr);
+              //System.out.println("DEBUG addr=" + addr + " size=" + Chunk.getSize(addr) + " idx="+idx + " sortedSize=" + sortedSize);
+              if (idx >= 0) {
+                throw new IllegalStateException("duplicate memory address found during compaction!");
+              }
+              idx = -idx;
+              idx--;
+              if (idx == sortedSize) {
+                // addr is > everything in the array
+                if (sortedSize == 0) {
+                  // nothing was in the array
+                  sorted[0] = addr;
+                  sortedSize++;
+                } else {
+                  // see if we can conflate into sorted[idx]
+                  long lowAddr = sorted[idx-1];
+                  int lowSize = Chunk.getSize(lowAddr);
+                  if (lowAddr + lowSize == addr) {
+                    // append the addr chunk to lowAddr
+                    Chunk.setSize(lowAddr, lowSize + Chunk.getSize(addr));
+                  } else {
+                    if (sortedSize >= sorted.length) {
+                      long[] newSorted = new long[sorted.length+SORT_ARRAY_BLOCK_SIZE];
+                      System.arraycopy(sorted, 0, newSorted, 0, sorted.length);
+                      sorted = newSorted;
+                    }
+                    sortedSize++;
+                    sorted[idx] = addr;
+                  }
+                }
+              } else {
+                int addrSize = Chunk.getSize(addr);
+                long highAddr = sorted[idx];
+                if (addr + addrSize == highAddr) {
+                  // append highAddr chunk to addr
+                  Chunk.setSize(addr, addrSize + Chunk.getSize(highAddr));
+                  sorted[idx] = addr;
+                } else {
+                  boolean insert = idx==0;
+                  if (!insert) {
+                    long lowAddr = sorted[idx-1];
+  //                  if (lowAddr == 0L) {
+  //                    long[] tmp = Arrays.copyOf(sorted, sortedSize);
+  //                    throw new IllegalStateException("addr was zero at idx=" + (idx-1) + " sorted="+ Arrays.toString(tmp));
+  //                  }
+                    int lowSize = Chunk.getSize(lowAddr);
+                    if (lowAddr + lowSize == addr) {
+                      // append the addr chunk to lowAddr
+                      Chunk.setSize(lowAddr, lowSize + addrSize);
+                    } else {
+                      insert = true;
+                    }
+                  }
+                  if (insert) {
+                    if (sortedSize >= sorted.length) {
+                      long[] newSorted = new long[sorted.length+SORT_ARRAY_BLOCK_SIZE];
+                      System.arraycopy(sorted, 0, newSorted, 0, idx);
+                      newSorted[idx] = addr;
+                      System.arraycopy(sorted, idx, newSorted, idx+1, sortedSize-idx);
+                      sorted = newSorted;
+                    } else {
+                      System.arraycopy(sorted, idx, sorted, idx+1, sortedSize-idx);
+                      sorted[idx] = addr;
+                    }
+                    sortedSize++;
+                  }
+                }
+              }
+              addr = l.poll();
+            }
+          }
+          for (int i=sortedSize-1; i > 0; i--) {
+            long addr = sorted[i];
+            long lowAddr = sorted[i-1];
+            int lowSize = Chunk.getSize(lowAddr);
+            if (lowAddr + lowSize == addr) {
+              // append addr chunk to lowAddr
+              Chunk.setSize(lowAddr, lowSize + Chunk.getSize(addr));
+              sorted[i] = 0L;
+            }
+          }
+          this.lastFragmentAllocation.set(0);
+          ArrayList<Fragment> tmp = new ArrayList<Fragment>();
+          for (int i=sortedSize-1; i >= 0; i--) {
+            long addr = sorted[i];
+            if (addr == 0L) continue;
+            int addrSize = Chunk.getSize(addr);
+            Fragment f = new Fragment(addr, addrSize);
+            if (addrSize >= chunkSize) {
+              result = true;
+            }
+            if (addrSize > largestFragment) {
+              largestFragment = addrSize;
+              // TODO it might be better to sort them biggest first
+              tmp.add(0, f);
+            } else {
+              tmp.add(f);
+            }
+          }
+          this.fragmentList.addAll(tmp);
+          
+          // Reinitialize fragments with fill pattern data
+          if(this.ma.validateMemoryWithFill) {
+            fillFragments();
+          }
+          
+          // Signal any waiters that a compaction happened.
+          this.compactCount.incrementAndGet();
+          
+          this.ma.getStats().setLargestFragment(largestFragment);
+          this.ma.getStats().setFragments(tmp.size());        
+          updateFragmentation();
+          
+          return result;
+        } // sync
+      } finally {
+        this.ma.getStats().endCompaction(startCompactionTime);
+      }
+    }
+    
+    private void updateFragmentation() {      
+      long freeSize = this.ma.getStats().getFreeMemory();
+
+      // Calculate free space fragmentation only if there is free space available.
+      if(freeSize > 0) {
+        long largestFragment = this.ma.getStats().getLargestFragment();
+        long numerator = freeSize - largestFragment;
+        
+        double percentage = (double) numerator / (double) freeSize;
+        percentage *= 100d;
+        
+        int wholePercentage = (int) Math.rint(percentage);
+        this.ma.getStats().setFragmentation(wholePercentage);
+      } else {
+        // No free space? Then we have no free space fragmentation.
+        this.ma.getStats().setFragmentation(0);
+      }
+    }
+    
+    private void collectFreeChunks(List<SyncChunkStack> l) {
+      collectFreeFragmentChunks(l);
+      collectFreeHugeChunks(l);
+//      collectFreeBigChunks(l);
+      collectFreeTinyChunks(l);
+    }
+    private void collectFreeFragmentChunks(List<SyncChunkStack> l) {
+      if (this.fragmentList.size() == 0) return;
+      SyncChunkStack result = new SyncChunkStack();
+      for (Fragment f: this.fragmentList) {
+        int offset;
+        int diff;
+        do {
+          offset = f.getFreeIndex();
+          diff = f.getSize() - offset;
+        } while (diff >= Chunk.MIN_CHUNK_SIZE && !f.allocate(offset, offset+diff));
+        if (diff < Chunk.MIN_CHUNK_SIZE) {
+          if (diff > 0) {
+            SimpleMemoryAllocatorImpl.logger.debug("Lost memory of size {}", diff);
+          }
+          // fragment is too small to turn into a chunk
+          // TODO we need to make sure this never happens
+          // by keeping sizes rounded. I think I did this
+          // by introducing MIN_CHUNK_SIZE and by rounding
+          // the size of huge allocations.
+          continue;
+        }
+        long chunkAddr = f.getMemoryAddress()+offset;
+        Chunk.setSize(chunkAddr, diff);
+        result.offer(chunkAddr);
+      }
+      // All the fragments have been turned in to chunks so now clear them
+      // The compaction will create new fragments.
+      this.fragmentList.clear();
+      if (!result.isEmpty()) {
+        l.add(result);
+      }
+    }
+    private void collectFreeTinyChunks(List<SyncChunkStack> l) {
+      for (int i=0; i < this.tinyFreeLists.length(); i++) {
+        SyncChunkStack cl = this.tinyFreeLists.get(i);
+        if (cl != null) {
+          long head = cl.clear();
+          if (head != 0L) {
+            l.add(new SyncChunkStack(head));
+          }
+        }
+      }
+    }
+//    private void collectFreeBigChunks(List<ConcurrentChunkStack> l) {
+//      for (int i=0; i < this.bigFreeLists.length(); i++) {
+//        ConcurrentChunkStack cl = this.bigFreeLists.get(i);
+//        if (cl != null) {
+//          long head = cl.clear();
+//          if (head != 0L) {
+//            l.add(new ConcurrentChunkStack(head));
+//          }
+//        }
+//      }
+//    }
+    public void collectFreeHugeChunks(List<SyncChunkStack> l) {
+      Chunk c = this.hugeChunkSet.pollFirst();
+      SyncChunkStack result = null;
+      while (c != null) {
+        if (result == null) {
+          result = new SyncChunkStack();
+          l.add(result);
+        }
+        result.offer(c.getMemoryAddress());
+        c = this.hugeChunkSet.pollFirst();
+      }
+    }
+    
+    private Chunk allocateFromFragment(final int fragIdx, final int chunkSize, ChunkType chunkType) {
+      if (fragIdx >= this.fragmentList.size()) return null;
+      final Fragment fragment;
+      try {
+        fragment = this.fragmentList.get(fragIdx);
+      } catch (IndexOutOfBoundsException ignore) {
+        // A concurrent compaction can cause this.
+        return null;
+      }
+      boolean retryFragment;
+      do {
+        retryFragment = false;
+        int oldOffset = fragment.getFreeIndex();
+        int fragmentSize = fragment.getSize();
+        int fragmentFreeSize = fragmentSize - oldOffset;
+        if (fragmentFreeSize >= chunkSize) {
+          // this fragment has room
+          // Try to allocate up to BATCH_SIZE more chunks from it
+          int allocSize = chunkSize * SimpleMemoryAllocatorImpl.BATCH_SIZE;
+          if (allocSize > fragmentFreeSize) {
+            allocSize = (fragmentFreeSize / chunkSize) * chunkSize;
+          }
+          int newOffset = oldOffset + allocSize;
+          int extraSize = fragmentSize - newOffset;
+          if (extraSize < Chunk.MIN_CHUNK_SIZE) {
+            // include these last few bytes of the fragment in the allocation.
+            // If we don't then they will be lost forever.
+            // The extraSize bytes only apply to the first chunk we allocate (not the batch ones).
+            newOffset += extraSize;
+          } else {
+            extraSize = 0;
+          }
+          if (fragment.allocate(oldOffset, newOffset)) {
+            // We did the allocate!
+            this.lastFragmentAllocation.set(fragIdx);
+            Chunk result = this.ma.chunkFactory.newChunk(fragment.getMemoryAddress()+oldOffset, chunkSize+extraSize, chunkType);
+            allocSize -= chunkSize+extraSize;
+            oldOffset += extraSize;
+            while (allocSize > 0) {
+              oldOffset += chunkSize;
+              // we add the batch ones immediately to the freelist
+              result.readyForFree();
+              free(result.getMemoryAddress(), false);
+              result = this.ma.chunkFactory.newChunk(fragment.getMemoryAddress()+oldOffset, chunkSize, chunkType);
+              allocSize -= chunkSize;
+            }
+            
+            if(this.ma.validateMemoryWithFill) {
+              result.validateFill();
+            }
+            
+            return result;
+          } else {
+            // TODO OFFHEAP: if batch allocations are disabled should we not call basicAllocate here?
+            // Since we know another thread did a concurrent alloc
+            // that possibly did a batch check the free list again.
+            Chunk result = basicAllocate(chunkSize, false, chunkType);
+            if (result != null) {
+              return result;
+            }
+            retryFragment = true;
+          }
+        }
+      } while (retryFragment);
+      return null; // did not find enough free space in this fragment
+    }
+
+    private int round(int multiple, int value) {
+      return (int) ((((long)value + (multiple-1)) / multiple) * multiple);
+    }
+    private Chunk allocateTiny(int size, boolean useFragments, ChunkType chunkType) {
+      return basicAllocate(getNearestTinyMultiple(size), SimpleMemoryAllocatorImpl.TINY_MULTIPLE, 0, this.tinyFreeLists, useFragments, chunkType);
+    }
+//    private Chunk allocateBig(int size, boolean useFragments) {
+//      return basicAllocate(getNearestBigMultiple(size), BIG_MULTIPLE, BIG_OFFSET, this.bigFreeLists, useFragments);
+//    }
+    private Chunk basicAllocate(int idx, int multiple, int offset, AtomicReferenceArray<SyncChunkStack> freeLists, boolean useFragments, ChunkType chunkType) {
+      SyncChunkStack clq = freeLists.get(idx);
+      if (clq != null) {
+        long memAddr = clq.poll();
+        if (memAddr != 0) {
+          Chunk result = this.ma.chunkFactory.newChunk(memAddr, chunkType);
+          
+          // Data integrity check.
+          if(this.ma.validateMemoryWithFill) {          
+            result.validateFill();
+          }
+          
+          result.readyForAllocation(chunkType);
+          return result;
+        }
+      }
+      if (useFragments) {
+        return allocateFromFragments(((idx+1)*multiple)+offset, chunkType);
+      } else {
+        return null;
+      }
+    }
+    private Chunk allocateHuge(int size, boolean useFragments, ChunkType chunkType) {
+      // sizeHolder is a fake Chunk used to search our sorted hugeChunkSet.
+      Chunk sizeHolder = new FakeChunk(size);
+      NavigableSet<Chunk> ts = this.hugeChunkSet.tailSet(sizeHolder);
+      Chunk result = ts.pollFirst();
+      if (result != null) {
+        if (result.getSize() - (SimpleMemoryAllocatorImpl.HUGE_MULTIPLE - Chunk.OFF_HEAP_HEADER_SIZE) < size) {
+          // close enough to the requested size; just return it.
+          
+          // Data integrity check.
+          if(this.ma.validateMemoryWithFill) {          
+            result.validateFill();
+          }
+          if (chunkType.getSrcType() != Chunk.getSrcType(result.getMemoryAddress())) {
+            // The java wrapper class that was cached in the huge chunk list is the wrong type.
+            // So allocate a new one and garbage collect the old one.
+            result = this.ma.chunkFactory.newChunk(result.getMemoryAddress(), chunkType);
+          }
+          result.readyForAllocation(chunkType);
+          return result;
+        } else {
+          this.hugeChunkSet.add(result);
+        }
+      }
+      if (useFragments) {
+        // We round it up to the next multiple of TINY_MULTIPLE to make
+        // sure we always have chunks allocated on an 8 byte boundary.
+        return allocateFromFragments(round(SimpleMemoryAllocatorImpl.TINY_MULTIPLE, size), chunkType);
+      } else {
+        return null;
+      }
+    }
+    
+    @SuppressWarnings("synthetic-access")
+    public void free(long addr) {
+      free(addr, true);
+    }
+    
+    private void free(long addr, boolean updateStats) {
+      int cSize = Chunk.getSize(addr);
+      if (updateStats) {
+        this.ma.stats.incObjects(-1);
+        this.allocatedSize.addAndGet(-cSize);
+        this.ma.stats.incUsedMemory(-cSize);
+        this.ma.stats.incFreeMemory(cSize);
+        this.ma.notifyListeners();
+      }
+      if (cSize <= SimpleMemoryAllocatorImpl.MAX_TINY) {
+        freeTiny(addr, cSize);
+      } else {
+        freeHuge(addr, cSize);
+      }
+    }
+    private void freeTiny(long addr, int cSize) {
+      basicFree(addr, getNearestTinyMultiple(cSize), this.tinyFreeLists);
+    }
+    private void basicFree(long addr, int idx, AtomicReferenceArray<SyncChunkStack> freeLists) {
+      SyncChunkStack clq = freeLists.get(idx);
+      if (clq != null) {
+        clq.offer(addr);
+      } else {
+        clq = new SyncChunkStack();
+        clq.offer(addr);
+        if (!freeLists.compareAndSet(idx, null, clq)) {
+          clq = freeLists.get(idx);
+          clq.offer(addr);
+        }
+      }
+      
+    }
+    private void freeHuge(long addr, int cSize) {
+      this.hugeChunkSet.add(this.ma.chunkFactory.newChunk(addr)); // TODO make this a collection of longs
+    }
+  }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3e7da937/gemfire-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
index 0cb7068..688fbe0 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
@@ -24,25 +24,19 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NavigableSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceArray;
 
 import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.LogWriter;
-import com.gemstone.gemfire.OutOfOffHeapMemoryException;
 import com.gemstone.gemfire.cache.CacheClosedException;
 import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 import com.gemstone.gemfire.internal.cache.BucketRegion;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.LocalRegion;
@@ -68,7 +62,7 @@ import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
  */
 public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryInspector {
 
-  private static final Logger logger = LogService.getLogger();
+  static final Logger logger = LogService.getLogger();
   
   public static final String FREE_OFF_HEAP_MEMORY_PROPERTY = "gemfire.free-off-heap-memory";
   
@@ -91,9 +85,9 @@ public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryI
   public final static int MAX_TINY = TINY_MULTIPLE*TINY_FREE_LIST_COUNT;
   public final static int HUGE_MULTIPLE = 256;
   
-  private volatile OffHeapMemoryStats stats;
+  volatile OffHeapMemoryStats stats;
   
-  private volatile OutOfOffHeapMemoryListener ooohml;
+  volatile OutOfOffHeapMemoryListener ooohml;
   
   /** The MemoryChunks that this allocator is managing by allocating smaller chunks of them.
    * The contents of this array never change.
@@ -108,7 +102,7 @@ public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryI
   
   private static SimpleMemoryAllocatorImpl singleton = null;
   private static final AtomicReference<Thread> asyncCleanupThread = new AtomicReference<Thread>();
-  private final ChunkFactory chunkFactory;
+  final ChunkFactory chunkFactory;
   
   public static SimpleMemoryAllocatorImpl getAllocator() {
     SimpleMemoryAllocatorImpl result = singleton;
@@ -296,7 +290,7 @@ public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryI
     this.stats.incMaxMemory(this.totalSlabSize);
     this.stats.incFreeMemory(this.totalSlabSize);
     
-    this.freeList = new FreeListManager();
+    this.freeList = new FreeListManager(this);
   }
   
   public List<Chunk> getLostChunks() {
@@ -567,7 +561,7 @@ public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryI
     }
   }
   
-  private void notifyListeners() {
+  void notifyListeners() {
     final MemoryUsageListener[] savedListeners = this.memoryUsageListeners;
     
     if (savedListeners.length == 0) {
@@ -580,649 +574,6 @@ public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryI
     }
   }
   
-  public class FreeListManager {
-    private final AtomicReferenceArray<SyncChunkStack> tinyFreeLists = new AtomicReferenceArray<SyncChunkStack>(TINY_FREE_LIST_COUNT);
-    // hugeChunkSet is sorted by chunk size in ascending order. It will only contain chunks larger than MAX_TINY.
-    private final ConcurrentSkipListSet<Chunk> hugeChunkSet = new ConcurrentSkipListSet<Chunk>();
-    private final AtomicLong allocatedSize = new AtomicLong(0L);
-   
-    private int getNearestTinyMultiple(int size) {
-      return (size-1)/TINY_MULTIPLE;
-    }
-    public List<Chunk> getLiveChunks() {
-      ArrayList<Chunk> result = new ArrayList<Chunk>();
-      UnsafeMemoryChunk[] slabs = getSlabs();
-      for (int i=0; i < slabs.length; i++) {
-        getLiveChunks(slabs[i], result);
-      }
-      return result;
-    }
-    private void getLiveChunks(UnsafeMemoryChunk slab, List<Chunk> result) {
-      long addr = slab.getMemoryAddress();
-      while (addr <= (slab.getMemoryAddress() + slab.getSize() - Chunk.MIN_CHUNK_SIZE)) {
-        Fragment f = isAddrInFragmentFreeSpace(addr);
-        if (f != null) {
-          addr = f.getMemoryAddress() + f.getSize();
-        } else {
-          int curChunkSize = Chunk.getSize(addr);
-          int refCount = Chunk.getRefCount(addr);
-          if (refCount > 0) {
-            result.add(SimpleMemoryAllocatorImpl.this.chunkFactory.newChunk(addr));
-          }
-          addr += curChunkSize;
-        }
-      }
-    }
-    /**
-     * If addr is in the free space of a fragment then return that fragment; otherwise return null.
-     */
-    private Fragment isAddrInFragmentFreeSpace(long addr) {
-      for (Fragment f: this.fragmentList) {
-        if (addr >= (f.getMemoryAddress() + f.getFreeIndex()) && addr < (f.getMemoryAddress() + f.getSize())) {
-          return f;
-        }
-      }
-      return null;
-    }
-    public long getUsedMemory() {
-      return this.allocatedSize.get();
-    }
-    public long getFreeMemory() {
-      return getTotalMemory() - getUsedMemory();
-    }
-    public long getFreeFragmentMemory() {
-      long result = 0;
-      for (Fragment f: this.fragmentList) {
-        int freeSpace = f.freeSpace();
-        if (freeSpace >= Chunk.MIN_CHUNK_SIZE) {
-          result += freeSpace;
-        }
-      }
-      return result;
-    }
-    public long getFreeTinyMemory() {
-      long tinyFree = 0;
-      for (int i=0; i < this.tinyFreeLists.length(); i++) {
-        SyncChunkStack cl = this.tinyFreeLists.get(i);
-        if (cl != null) {
-          tinyFree += cl.computeTotalSize();
-        }
-      }
-      return tinyFree;
-    }
-    public long getFreeHugeMemory() {
-      long hugeFree = 0;
-      for (Chunk c: this.hugeChunkSet) {
-        hugeFree += c.getSize();
-      }
-      return hugeFree;
-    }
-
-    /**
-     * The id of the last fragment we allocated from.
-     */
-    private final AtomicInteger lastFragmentAllocation = new AtomicInteger(0);
-
-    private final CopyOnWriteArrayList<Fragment> fragmentList;
-    public FreeListManager() {
-      UnsafeMemoryChunk[] slabs = getSlabs();
-      Fragment[] tmp = new Fragment[slabs.length];
-      for (int i=0; i < slabs.length; i++) {
-        tmp[i] = new Fragment(slabs[i].getMemoryAddress(), slabs[i].getSize());
-      }
-      this.fragmentList = new CopyOnWriteArrayList<Fragment>(tmp);
-      
-      if(validateMemoryWithFill) {
-        fillFragments();
-      }
-    }
-    
-    /**
-     * Fills all fragments with a fill used for data integrity validation.
-     */
-    private void fillFragments() {
-      for(Fragment fragment : this.fragmentList) {
-        fragment.fill();
-      }
-    }
-    
-    /**
-     * Allocate a chunk of memory of at least the given size.
-     * The basic algorithm is:
-     * 1. Look for a previously allocated and freed chunk close to the size requested.
-     * 2. See if the original chunk is big enough to split. If so do so.
-     * 3. Look for a previously allocated and freed chunk of any size larger than the one requested.
-     *    If we find one split it.
-     * <p>
-     * It might be better not to include step 3 since we expect and freed chunk to be reallocated in the future.
-     * Maybe it would be better for 3 to look for adjacent free blocks that can be merged together.
-     * For now we will just try 1 and 2 and then report out of mem.
-     * @param size minimum bytes the returned chunk must have.
-     * @param chunkType TODO
-     * @return the allocated chunk
-     * @throws IllegalStateException if a chunk can not be allocated.
-     */
-    @SuppressWarnings("synthetic-access")
-    public Chunk allocate(int size, ChunkType chunkType) {
-      Chunk result = null;
-      {
-        assert size > 0;
-        if (chunkType == null) {
-          chunkType = GemFireChunk.TYPE;
-        }
-        result = basicAllocate(size, true, chunkType);
-        result.setDataSize(size);
-      }
-      stats.incObjects(1);
-      int resultSize = result.getSize();
-      this.allocatedSize.addAndGet(resultSize);
-      stats.incUsedMemory(resultSize);
-      stats.incFreeMemory(-resultSize);
-      result.initializeUseCount();
-      notifyListeners();
-      
-      return result;
-    }
-    
-    private Chunk basicAllocate(int size, boolean useSlabs, ChunkType chunkType) {
-      if (useSlabs) {
-        // Every object stored off heap has a header so we need
-        // to adjust the size so that the header gets allocated.
-        // If useSlabs is false then the incoming size has already
-        // been adjusted.
-        size += Chunk.OFF_HEAP_HEADER_SIZE;
-      }
-      if (size <= MAX_TINY) {
-        return allocateTiny(size, useSlabs, chunkType);
-      } else {
-        return allocateHuge(size, useSlabs, chunkType);
-      }
-    }
-    
-    private Chunk allocateFromFragments(int chunkSize, ChunkType chunkType) {
-      do {
-        final int lastAllocationId = this.lastFragmentAllocation.get();
-        for (int i=lastAllocationId; i < this.fragmentList.size(); i++) {
-          Chunk result = allocateFromFragment(i, chunkSize, chunkType);
-          if (result != null) {
-            return result;
-          }
-        }
-        for (int i=0; i < lastAllocationId; i++) {
-          Chunk result = allocateFromFragment(i, chunkSize, chunkType);
-          if (result != null) {
-            return result;
-          }
-        }
-      } while (compact(chunkSize));
-      // We tried all the fragments and didn't find any free memory.
-      logOffHeapState(chunkSize);
-      final OutOfOffHeapMemoryException failure = new OutOfOffHeapMemoryException("Out of off-heap memory. Could not allocate size of " + chunkSize);
-      try {
-        throw failure;
-      } finally {
-        SimpleMemoryAllocatorImpl.this.ooohml.outOfOffHeapMemory(failure);
-      }
-    }
-    
-    private void logOffHeapState(int chunkSize) {
-      if (InternalDistributedSystem.getAnyInstance() != null) {
-        LogWriter lw = InternalDistributedSystem.getAnyInstance().getLogWriter();
-        lw.info("OutOfOffHeapMemory allocating size of " + chunkSize + ". allocated=" + this.allocatedSize.get() + " compactions=" + this.compactCount.get() + " objects=" + stats.getObjects() + " free=" + stats.getFreeMemory() + " fragments=" + stats.getFragments() + " largestFragment=" + stats.getLargestFragment() + " fragmentation=" + stats.getFragmentation());
-        logFragmentState(lw);
-        logTinyState(lw);
-//        logBigState(lw);
-        logHugeState(lw);
-      }
-    }
-
-    private void logHugeState(LogWriter lw) {
-      for (Chunk c: this.hugeChunkSet) {
-        lw.info("Free huge of size " + c.getSize());
-      }
-    }
-//    private void logBigState(LogWriter lw) {
-//      for (int i=0; i < this.bigFreeLists.length(); i++) {
-//        ConcurrentChunkStack cl = this.bigFreeLists.get(i);
-//        if (cl != null) {
-//          cl.logSizes(lw, "Free big of size ");
-//        }
-//      }
-//    }
-    private void logTinyState(LogWriter lw) {
-      for (int i=0; i < this.tinyFreeLists.length(); i++) {
-        SyncChunkStack cl = this.tinyFreeLists.get(i);
-        if (cl != null) {
-          cl.logSizes(lw, "Free tiny of size ");
-        }
-      }
-    }
-    private void logFragmentState(LogWriter lw) {
-      for (Fragment f: this.fragmentList) {
-        int freeSpace = f.freeSpace();
-        if (freeSpace > 0) {
-          lw.info("Fragment at " + f.getMemoryAddress() + " of size " + f.getSize() + " has " + freeSpace + " bytes free.");
-        }
-      }
-    }
-
-    private final AtomicInteger compactCount = new AtomicInteger();
-    /**
-     * Compacts memory and returns true if enough memory to allocate chunkSize
-     * is freed. Otherwise returns false;
-     * TODO OFFHEAP: what should be done about contiguous chunks that end up being bigger than 2G?
-     * Currently if we are given slabs bigger than 2G or that just happen to be contiguous and add
-     * up to 2G then the compactor may unify them together into a single Chunk and our 32-bit chunkSize
-     * field will overflow. This code needs to detect this and just create a chunk of 2G and then start
-     * a new one.
-     * Or to prevent it from happening we could just check the incoming slabs and throw away a few bytes
-     * to keep them from being contiguous.
-     */
-    private boolean compact(int chunkSize) {
-      final long startCompactionTime = getStats().startCompaction();
-      final int countPreSync = this.compactCount.get();
-      try {
-        synchronized (this) {
-          if (this.compactCount.get() != countPreSync) {
-            // someone else did a compaction while we waited on the sync.
-            // So just return true causing the caller to retry the allocation.
-            return true;
-          }
-          ArrayList<SyncChunkStack> freeChunks = new ArrayList<SyncChunkStack>();
-          collectFreeChunks(freeChunks);
-          final int SORT_ARRAY_BLOCK_SIZE = 128;
-          long[] sorted = new long[SORT_ARRAY_BLOCK_SIZE];
-          int sortedSize = 0;
-          boolean result = false;
-          int largestFragment = 0;
-          for (SyncChunkStack l: freeChunks) {
-            long addr = l.poll();
-            while (addr != 0) {
-              int idx = Arrays.binarySearch(sorted, 0, sortedSize, addr);
-              //System.out.println("DEBUG addr=" + addr + " size=" + Chunk.getSize(addr) + " idx="+idx + " sortedSize=" + sortedSize);
-              if (idx >= 0) {
-                throw new IllegalStateException("duplicate memory address found during compaction!");
-              }
-              idx = -idx;
-              idx--;
-              if (idx == sortedSize) {
-                // addr is > everything in the array
-                if (sortedSize == 0) {
-                  // nothing was in the array
-                  sorted[0] = addr;
-                  sortedSize++;
-                } else {
-                  // see if we can conflate into sorted[idx]
-                  long lowAddr = sorted[idx-1];
-                  int lowSize = Chunk.getSize(lowAddr);
-                  if (lowAddr + lowSize == addr) {
-                    // append the addr chunk to lowAddr
-                    Chunk.setSize(lowAddr, lowSize + Chunk.getSize(addr));
-                  } else {
-                    if (sortedSize >= sorted.length) {
-                      long[] newSorted = new long[sorted.length+SORT_ARRAY_BLOCK_SIZE];
-                      System.arraycopy(sorted, 0, newSorted, 0, sorted.length);
-                      sorted = newSorted;
-                    }
-                    sortedSize++;
-                    sorted[idx] = addr;
-                  }
-                }
-              } else {
-                int addrSize = Chunk.getSize(addr);
-                long highAddr = sorted[idx];
-                if (addr + addrSize == highAddr) {
-                  // append highAddr chunk to addr
-                  Chunk.setSize(addr, addrSize + Chunk.getSize(highAddr));
-                  sorted[idx] = addr;
-                } else {
-                  boolean insert = idx==0;
-                  if (!insert) {
-                    long lowAddr = sorted[idx-1];
-  //                  if (lowAddr == 0L) {
-  //                    long[] tmp = Arrays.copyOf(sorted, sortedSize);
-  //                    throw new IllegalStateException("addr was zero at idx=" + (idx-1) + " sorted="+ Arrays.toString(tmp));
-  //                  }
-                    int lowSize = Chunk.getSize(lowAddr);
-                    if (lowAddr + lowSize == addr) {
-                      // append the addr chunk to lowAddr
-                      Chunk.setSize(lowAddr, lowSize + addrSize);
-                    } else {
-                      insert = true;
-                    }
-                  }
-                  if (insert) {
-                    if (sortedSize >= sorted.length) {
-                      long[] newSorted = new long[sorted.length+SORT_ARRAY_BLOCK_SIZE];
-                      System.arraycopy(sorted, 0, newSorted, 0, idx);
-                      newSorted[idx] = addr;
-                      System.arraycopy(sorted, idx, newSorted, idx+1, sortedSize-idx);
-                      sorted = newSorted;
-                    } else {
-                      System.arraycopy(sorted, idx, sorted, idx+1, sortedSize-idx);
-                      sorted[idx] = addr;
-                    }
-                    sortedSize++;
-                  }
-                }
-              }
-              addr = l.poll();
-            }
-          }
-          for (int i=sortedSize-1; i > 0; i--) {
-            long addr = sorted[i];
-            long lowAddr = sorted[i-1];
-            int lowSize = Chunk.getSize(lowAddr);
-            if (lowAddr + lowSize == addr) {
-              // append addr chunk to lowAddr
-              Chunk.setSize(lowAddr, lowSize + Chunk.getSize(addr));
-              sorted[i] = 0L;
-            }
-          }
-          this.lastFragmentAllocation.set(0);
-          ArrayList<Fragment> tmp = new ArrayList<Fragment>();
-          for (int i=sortedSize-1; i >= 0; i--) {
-            long addr = sorted[i];
-            if (addr == 0L) continue;
-            int addrSize = Chunk.getSize(addr);
-            Fragment f = new Fragment(addr, addrSize);
-            if (addrSize >= chunkSize) {
-              result = true;
-            }
-            if (addrSize > largestFragment) {
-              largestFragment = addrSize;
-              // TODO it might be better to sort them biggest first
-              tmp.add(0, f);
-            } else {
-              tmp.add(f);
-            }
-          }
-          this.fragmentList.addAll(tmp);
-          
-          // Reinitialize fragments with fill pattern data
-          if(validateMemoryWithFill) {
-            fillFragments();
-          }
-          
-          // Signal any waiters that a compaction happened.
-          this.compactCount.incrementAndGet();
-          
-          getStats().setLargestFragment(largestFragment);
-          getStats().setFragments(tmp.size());        
-          updateFragmentation();
-          
-          return result;
-        } // sync
-      } finally {
-        getStats().endCompaction(startCompactionTime);
-      }
-    }
-    
-    private void updateFragmentation() {      
-      long freeSize = getStats().getFreeMemory();
-
-      // Calculate free space fragmentation only if there is free space available.
-      if(freeSize > 0) {
-        long largestFragment = getStats().getLargestFragment();
-        long numerator = freeSize - largestFragment;
-        
-        double percentage = (double) numerator / (double) freeSize;
-        percentage *= 100d;
-        
-        int wholePercentage = (int) Math.rint(percentage);
-        getStats().setFragmentation(wholePercentage);
-      } else {
-        // No free space? Then we have no free space fragmentation.
-        getStats().setFragmentation(0);
-      }
-    }
-    
-    private void collectFreeChunks(List<SyncChunkStack> l) {
-      collectFreeFragmentChunks(l);
-      collectFreeHugeChunks(l);
-//      collectFreeBigChunks(l);
-      collectFreeTinyChunks(l);
-    }
-    private void collectFreeFragmentChunks(List<SyncChunkStack> l) {
-      if (this.fragmentList.size() == 0) return;
-      SyncChunkStack result = new SyncChunkStack();
-      for (Fragment f: this.fragmentList) {
-        int offset;
-        int diff;
-        do {
-          offset = f.getFreeIndex();
-          diff = f.getSize() - offset;
-        } while (diff >= Chunk.MIN_CHUNK_SIZE && !f.allocate(offset, offset+diff));
-        if (diff < Chunk.MIN_CHUNK_SIZE) {
-          if (diff > 0) {
-            logger.debug("Lost memory of size {}", diff);
-          }
-          // fragment is too small to turn into a chunk
-          // TODO we need to make sure this never happens
-          // by keeping sizes rounded. I think I did this
-          // by introducing MIN_CHUNK_SIZE and by rounding
-          // the size of huge allocations.
-          continue;
-        }
-        long chunkAddr = f.getMemoryAddress()+offset;
-        Chunk.setSize(chunkAddr, diff);
-        result.offer(chunkAddr);
-      }
-      // All the fragments have been turned in to chunks so now clear them
-      // The compaction will create new fragments.
-      this.fragmentList.clear();
-      if (!result.isEmpty()) {
-        l.add(result);
-      }
-    }
-    private void collectFreeTinyChunks(List<SyncChunkStack> l) {
-      for (int i=0; i < this.tinyFreeLists.length(); i++) {
-        SyncChunkStack cl = this.tinyFreeLists.get(i);
-        if (cl != null) {
-          long head = cl.clear();
-          if (head != 0L) {
-            l.add(new SyncChunkStack(head));
-          }
-        }
-      }
-    }
-//    private void collectFreeBigChunks(List<ConcurrentChunkStack> l) {
-//      for (int i=0; i < this.bigFreeLists.length(); i++) {
-//        ConcurrentChunkStack cl = this.bigFreeLists.get(i);
-//        if (cl != null) {
-//          long head = cl.clear();
-//          if (head != 0L) {
-//            l.add(new ConcurrentChunkStack(head));
-//          }
-//        }
-//      }
-//    }
-    public void collectFreeHugeChunks(List<SyncChunkStack> l) {
-      Chunk c = this.hugeChunkSet.pollFirst();
-      SyncChunkStack result = null;
-      while (c != null) {
-        if (result == null) {
-          result = new SyncChunkStack();
-          l.add(result);
-        }
-        result.offer(c.getMemoryAddress());
-        c = this.hugeChunkSet.pollFirst();
-      }
-    }
-    
-    private Chunk allocateFromFragment(final int fragIdx, final int chunkSize, ChunkType chunkType) {
-      if (fragIdx >= this.fragmentList.size()) return null;
-      final Fragment fragment;
-      try {
-        fragment = this.fragmentList.get(fragIdx);
-      } catch (IndexOutOfBoundsException ignore) {
-        // A concurrent compaction can cause this.
-        return null;
-      }
-      boolean retryFragment;
-      do {
-        retryFragment = false;
-        int oldOffset = fragment.getFreeIndex();
-        int fragmentSize = fragment.getSize();
-        int fragmentFreeSize = fragmentSize - oldOffset;
-        if (fragmentFreeSize >= chunkSize) {
-          // this fragment has room
-          // Try to allocate up to BATCH_SIZE more chunks from it
-          int allocSize = chunkSize * BATCH_SIZE;
-          if (allocSize > fragmentFreeSize) {
-            allocSize = (fragmentFreeSize / chunkSize) * chunkSize;
-          }
-          int newOffset = oldOffset + allocSize;
-          int extraSize = fragmentSize - newOffset;
-          if (extraSize < Chunk.MIN_CHUNK_SIZE) {
-            // include these last few bytes of the fragment in the allocation.
-            // If we don't then they will be lost forever.
-            // The extraSize bytes only apply to the first chunk we allocate (not the batch ones).
-            newOffset += extraSize;
-          } else {
-            extraSize = 0;
-          }
-          if (fragment.allocate(oldOffset, newOffset)) {
-            // We did the allocate!
-            this.lastFragmentAllocation.set(fragIdx);
-            Chunk result = chunkFactory.newChunk(fragment.getMemoryAddress()+oldOffset, chunkSize+extraSize, chunkType);
-            allocSize -= chunkSize+extraSize;
-            oldOffset += extraSize;
-            while (allocSize > 0) {
-              oldOffset += chunkSize;
-              // we add the batch ones immediately to the freelist
-              result.readyForFree();
-              free(result.getMemoryAddress(), false);
-              result = chunkFactory.newChunk(fragment.getMemoryAddress()+oldOffset, chunkSize, chunkType);
-              allocSize -= chunkSize;
-            }
-            
-            if(validateMemoryWithFill) {
-              result.validateFill();
-            }
-            
-            return result;
-          } else {
-            // TODO OFFHEAP: if batch allocations are disabled should we not call basicAllocate here?
-            // Since we know another thread did a concurrent alloc
-            // that possibly did a batch check the free list again.
-            Chunk result = basicAllocate(chunkSize, false, chunkType);
-            if (result != null) {
-              return result;
-            }
-            retryFragment = true;
-          }
-        }
-      } while (retryFragment);
-      return null; // did not find enough free space in this fragment
-    }
-
-    private int round(int multiple, int value) {
-      return (int) ((((long)value + (multiple-1)) / multiple) * multiple);
-    }
-    private Chunk allocateTiny(int size, boolean useFragments, ChunkType chunkType) {
-      return basicAllocate(getNearestTinyMultiple(size), TINY_MULTIPLE, 0, this.tinyFreeLists, useFragments, chunkType);
-    }
-//    private Chunk allocateBig(int size, boolean useFragments) {
-//      return basicAllocate(getNearestBigMultiple(size), BIG_MULTIPLE, BIG_OFFSET, this.bigFreeLists, useFragments);
-//    }
-    private Chunk basicAllocate(int idx, int multiple, int offset, AtomicReferenceArray<SyncChunkStack> freeLists, boolean useFragments, ChunkType chunkType) {
-      SyncChunkStack clq = freeLists.get(idx);
-      if (clq != null) {
-        long memAddr = clq.poll();
-        if (memAddr != 0) {
-          Chunk result = SimpleMemoryAllocatorImpl.this.chunkFactory.newChunk(memAddr, chunkType);
-          
-          // Data integrity check.
-          if(validateMemoryWithFill) {          
-            result.validateFill();
-          }
-          
-          result.readyForAllocation(chunkType);
-          return result;
-        }
-      }
-      if (useFragments) {
-        return allocateFromFragments(((idx+1)*multiple)+offset, chunkType);
-      } else {
-        return null;
-      }
-    }
-    private Chunk allocateHuge(int size, boolean useFragments, ChunkType chunkType) {
-      // sizeHolder is a fake Chunk used to search our sorted hugeChunkSet.
-      Chunk sizeHolder = new FakeChunk(size);
-      NavigableSet<Chunk> ts = this.hugeChunkSet.tailSet(sizeHolder);
-      Chunk result = ts.pollFirst();
-      if (result != null) {
-        if (result.getSize() - (HUGE_MULTIPLE - Chunk.OFF_HEAP_HEADER_SIZE) < size) {
-          // close enough to the requested size; just return it.
-          
-          // Data integrity check.
-          if(validateMemoryWithFill) {          
-            result.validateFill();
-          }
-          if (chunkType.getSrcType() != Chunk.getSrcType(result.getMemoryAddress())) {
-            // The java wrapper class that was cached in the huge chunk list is the wrong type.
-            // So allocate a new one and garbage collect the old one.
-            result = SimpleMemoryAllocatorImpl.this.chunkFactory.newChunk(result.getMemoryAddress(), chunkType);
-          }
-          result.readyForAllocation(chunkType);
-          return result;
-        } else {
-          this.hugeChunkSet.add(result);
-        }
-      }
-      if (useFragments) {
-        // We round it up to the next multiple of TINY_MULTIPLE to make
-        // sure we always have chunks allocated on an 8 byte boundary.
-        return allocateFromFragments(round(TINY_MULTIPLE, size), chunkType);
-      } else {
-        return null;
-      }
-    }
-    
-    @SuppressWarnings("synthetic-access")
-    public void free(long addr) {
-      free(addr, true);
-    }
-    
-    private void free(long addr, boolean updateStats) {
-      int cSize = Chunk.getSize(addr);
-      if (updateStats) {
-        stats.incObjects(-1);
-        this.allocatedSize.addAndGet(-cSize);
-        stats.incUsedMemory(-cSize);
-        stats.incFreeMemory(cSize);
-        notifyListeners();
-      }
-      if (cSize <= MAX_TINY) {
-        freeTiny(addr, cSize);
-      } else {
-        freeHuge(addr, cSize);
-      }
-    }
-    private void freeTiny(long addr, int cSize) {
-      basicFree(addr, getNearestTinyMultiple(cSize), this.tinyFreeLists);
-    }
-    private void basicFree(long addr, int idx, AtomicReferenceArray<SyncChunkStack> freeLists) {
-      SyncChunkStack clq = freeLists.get(idx);
-      if (clq != null) {
-        clq.offer(addr);
-      } else {
-        clq = new SyncChunkStack();
-        clq.offer(addr);
-        if (!freeLists.compareAndSet(idx, null, clq)) {
-          clq = freeLists.get(idx);
-          clq.offer(addr);
-        }
-      }
-      
-    }
-    private void freeHuge(long addr, int cSize) {
-      this.hugeChunkSet.add(SimpleMemoryAllocatorImpl.this.chunkFactory.newChunk(addr)); // TODO make this a collection of longs
-    }
-  }
-  
   static void validateAddress(long addr) {
     validateAddressAndSize(addr, -1);
   }