You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2015/11/24 19:11:43 UTC

[2/4] incubator-geode git commit: GEODE-580: cleanup off-heap code

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4b0925e3/gemfire-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
index c800335..f16253e 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
@@ -16,69 +16,30 @@
  */
 package com.gemstone.gemfire.internal.offheap;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NavigableSet;
-import java.util.NoSuchElementException;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicReferenceArray;
-
 import org.apache.logging.log4j.Logger;
 
-import com.gemstone.gemfire.DataSerializer;
 import com.gemstone.gemfire.LogWriter;
-import com.gemstone.gemfire.OutOfOffHeapMemoryException;
 import com.gemstone.gemfire.cache.CacheClosedException;
 import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.DSCODE;
-import com.gemstone.gemfire.internal.DataSerializableFixedID;
-import com.gemstone.gemfire.internal.HeapDataOutputStream;
-import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.cache.BucketRegion;
-import com.gemstone.gemfire.internal.cache.BytesAndBitsForCompactor;
-import com.gemstone.gemfire.internal.cache.CachedDeserializableFactory;
-import com.gemstone.gemfire.internal.cache.EntryBits;
-import com.gemstone.gemfire.internal.cache.EntryEventImpl;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.LocalRegion;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
 import com.gemstone.gemfire.internal.cache.RegionEntry;
-import com.gemstone.gemfire.internal.cache.RegionEntryContext;
-import com.gemstone.gemfire.internal.lang.StringUtils;
 import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.Chunk;
-import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.ConcurrentBag.Node;
 import com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier;
 import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
-import com.gemstone.gemfire.internal.shared.StringPrintWriter;
 
 /**
  * This allocator is somewhat like an Arena allocator.
@@ -95,7 +56,7 @@ import com.gemstone.gemfire.internal.shared.StringPrintWriter;
  */
 public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryInspector {
 
-  private static final Logger logger = LogService.getLogger();
+  static final Logger logger = LogService.getLogger();
   
   public static final String FREE_OFF_HEAP_MEMORY_PROPERTY = "gemfire.free-off-heap-memory";
   
@@ -118,9 +79,9 @@ public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryI
   public final static int MAX_TINY = TINY_MULTIPLE*TINY_FREE_LIST_COUNT;
   public final static int HUGE_MULTIPLE = 256;
   
-  private volatile OffHeapMemoryStats stats;
+  volatile OffHeapMemoryStats stats;
   
-  private volatile OutOfOffHeapMemoryListener ooohml;
+  volatile OutOfOffHeapMemoryListener ooohml;
   
   /** The MemoryChunks that this allocator is managing by allocating smaller chunks of them.
    * The contents of this array never change.
@@ -135,7 +96,7 @@ public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryI
   
   private static SimpleMemoryAllocatorImpl singleton = null;
   private static final AtomicReference<Thread> asyncCleanupThread = new AtomicReference<Thread>();
-  private final ChunkFactory chunkFactory;
+  final ChunkFactory chunkFactory;
   
   public static SimpleMemoryAllocatorImpl getAllocator() {
     SimpleMemoryAllocatorImpl result = singleton;
@@ -157,7 +118,7 @@ public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryI
       result.reuse(ooohml, lw, stats, offHeapMemorySize);
       lw.config("Reusing " + result.getTotalMemory() + " bytes of off-heap memory. The maximum size of a single off-heap object is " + result.largestSlab + " bytes.");
       created = true;
-      invokeAfterReuse(result);
+      LifecycleListener.invokeAfterReuse(result);
     } else {
       // allocate memory chunks
       //SimpleMemoryAllocatorImpl.cleanupPreviousAllocator();
@@ -189,7 +150,7 @@ public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryI
       result = new SimpleMemoryAllocatorImpl(ooohml, stats, slabs);
       created = true;
       singleton = result;
-      invokeAfterCreate(result);
+      LifecycleListener.invokeAfterCreate(result);
     }
     } finally {
       if (!created) {
@@ -203,7 +164,7 @@ public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryI
   public static SimpleMemoryAllocatorImpl create(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats stats, UnsafeMemoryChunk[] slabs) {
     SimpleMemoryAllocatorImpl result = new SimpleMemoryAllocatorImpl(oooml, stats, slabs);
     singleton = result;
-    invokeAfterCreate(result);
+    LifecycleListener.invokeAfterCreate(result);
     return result;
   }
   
@@ -323,7 +284,7 @@ public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryI
     this.stats.incMaxMemory(this.totalSlabSize);
     this.stats.incFreeMemory(this.totalSlabSize);
     
-    this.freeList = new FreeListManager();
+    this.freeList = new FreeListManager(this);
   }
   
   public List<Chunk> getLostChunks() {
@@ -401,190 +362,12 @@ public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryI
     //System.out.println("allocating " + size);
     Chunk result = this.freeList.allocate(size, chunkType);
     //("allocated off heap object of size " + size + " @" + Long.toHexString(result.getMemoryAddress()), true);
-    if (trackReferenceCounts()) {
-      refCountChanged(result.getMemoryAddress(), false, 1);
+    if (ReferenceCountHelper.trackReferenceCounts()) {
+      ReferenceCountHelper.refCountChanged(result.getMemoryAddress(), false, 1);
     }
     return result;
   }
   
-  /**
-   * Used to represent offheap addresses whose
-   * value encodes actual data instead a memory
-   * location.
-   * Instances of this class have a very short lifetime.
-   * 
-   * @author darrel
-   *
-   */
-  public static class DataAsAddress implements StoredObject {
-    private final long address;
-    
-    public DataAsAddress(long addr) {
-      this.address = addr;
-    }
-    
-    public long getEncodedAddress() {
-      return this.address;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof DataAsAddress) {
-        return getEncodedAddress() == ((DataAsAddress) o).getEncodedAddress();
-      }
-      return false;
-    }
-    
-    @Override
-    public int hashCode() {
-      long value = getEncodedAddress();
-      return (int)(value ^ (value >>> 32));
-    }
-
-    @Override
-    public int getSizeInBytes() {
-      return 0;
-    }
-
-    public byte[] getDecompressedBytes(RegionEntryContext r) {
-      return OffHeapRegionEntryHelper.encodedAddressToBytes(this.address, true, r);
-    }
-
-    /**
-     * If we contain a byte[] return it.
-     * Otherwise return the serialize bytes in us in a byte array.
-     */
-    public byte[] getRawBytes() {
-      return OffHeapRegionEntryHelper.encodedAddressToRawBytes(this.address);
-    }
-    
-    @Override
-    public byte[] getSerializedValue() {
-      return OffHeapRegionEntryHelper.encodedAddressToBytes(this.address);
-    }
-
-    @Override
-    public Object getDeserializedValue(Region r, RegionEntry re) {
-      return OffHeapRegionEntryHelper.encodedAddressToObject(this.address);
-    }
-
-    @Override
-    public Object getDeserializedForReading() {
-      return getDeserializedValue(null,null);
-    }
-    
-    @Override
-    public Object getValueAsDeserializedHeapObject() {
-      return getDeserializedValue(null,null);
-    }
-    
-    @Override
-    public byte[] getValueAsHeapByteArray() {
-      if (isSerialized()) {
-        return getSerializedValue();
-      } else {
-        return (byte[])getDeserializedForReading();
-      }
-    }
-
-    @Override
-    public String getStringForm() {
-      try {
-        return StringUtils.forceToString(getDeserializedForReading());
-      } catch (RuntimeException ex) {
-        return "Could not convert object to string because " + ex;
-      }
-    }
-
-    @Override
-    public Object getDeserializedWritableCopy(Region r, RegionEntry re) {
-      return getDeserializedValue(null,null);
-    }
-
-    @Override
-    public Object getValue() {
-      if (isSerialized()) {
-        return getSerializedValue();
-      } else {
-        throw new IllegalStateException("Can not call getValue on StoredObject that is not serialized");
-      }
-    }
-
-    @Override
-    public void writeValueAsByteArray(DataOutput out) throws IOException {
-      DataSerializer.writeByteArray(getSerializedValue(), out);
-    }
-
-    @Override
-    public void fillSerializedValue(BytesAndBitsForCompactor wrapper,
-        byte userBits) {
-      byte[] value;
-      if (isSerialized()) {
-        value = getSerializedValue();
-        userBits = EntryBits.setSerialized(userBits, true);
-      } else {
-        value = (byte[]) getDeserializedForReading();
-      }
-      wrapper.setData(value, userBits, value.length, true);
-    }
-
-    @Override
-    public int getValueSizeInBytes() {
-      return 0;
-    }
-    
-    @Override
-    public void sendTo(DataOutput out) throws IOException {
-      if (isSerialized()) {
-        out.write(getSerializedValue());
-      } else {
-        Object objToSend = (byte[]) getDeserializedForReading(); // deserialized as a byte[]
-        DataSerializer.writeObject(objToSend, out);
-      }
-    }
-
-    @Override
-    public void sendAsByteArray(DataOutput out) throws IOException {
-      byte[] bytes;
-      if (isSerialized()) {
-        bytes = getSerializedValue();
-      } else {
-        bytes = (byte[]) getDeserializedForReading();
-      }
-      DataSerializer.writeByteArray(bytes, out);
-      
-    }
-    
-    @Override
-    public void sendAsCachedDeserializable(DataOutput out) throws IOException {
-      if (!isSerialized()) {
-        throw new IllegalStateException("sendAsCachedDeserializable can only be called on serialized StoredObjects");
-      }
-      InternalDataSerializer.writeDSFIDHeader(DataSerializableFixedID.VM_CACHED_DESERIALIZABLE, out);
-      sendAsByteArray(out);
-    }
-
-    @Override
-    public boolean isSerialized() {
-      return OffHeapRegionEntryHelper.isSerialized(this.address);
-    }
-
-    @Override
-    public boolean isCompressed() {
-      return OffHeapRegionEntryHelper.isCompressed(this.address);
-    }
-    
-    @Override
-    public boolean retain() {
-      // nothing needed
-      return true;
-    }
-    @Override
-    public void release() {
-      // nothing needed
-    }
-  }
-
   @SuppressWarnings("unused")
   public static void debugLog(String msg, boolean logStack) {
     if (logStack) {
@@ -607,8 +390,8 @@ public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryI
     Chunk result = this.freeList.allocate(v.length, chunkType);
     //debugLog("allocated off heap object of size " + v.length + " @" + Long.toHexString(result.getMemoryAddress()), true);
     //debugLog("allocated off heap object of size " + v.length + " @" + Long.toHexString(result.getMemoryAddress()) +  "chunkSize=" + result.getSize() + " isSerialized=" + isSerialized + " v=" + Arrays.toString(v), true);
-    if (trackReferenceCounts()) {
-      refCountChanged(result.getMemoryAddress(), false, 1);
+    if (ReferenceCountHelper.trackReferenceCounts()) {
+      ReferenceCountHelper.refCountChanged(result.getMemoryAddress(), false, 1);
     }
     assert result.getChunkType() == chunkType: "chunkType=" + chunkType + " getChunkType()=" + result.getChunkType();
     result.setSerializedValue(v);
@@ -635,7 +418,7 @@ public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryI
   @Override
   public void close() {
     try {
-      invokeBeforeClose(this);
+      LifecycleListener.invokeBeforeClose(this);
     } finally {
       this.ooohml.close();
       if (Boolean.getBoolean(FREE_OFF_HEAP_MEMORY_PROPERTY)) {
@@ -710,7 +493,7 @@ public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryI
 //    asyncCleanupThread.set(t);    
   }
   
-  private void freeChunk(long addr) {
+  void freeChunk(long addr) {
     this.freeList.free(addr);
   }
   
@@ -772,7 +555,7 @@ public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryI
     }
   }
   
-  private void notifyListeners() {
+  void notifyListeners() {
     final MemoryUsageListener[] savedListeners = this.memoryUsageListeners;
     
     if (savedListeners.length == 0) {
@@ -785,2391 +568,73 @@ public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryI
     }
   }
   
-  public class FreeListManager {
-    private final AtomicReferenceArray<SyncChunkStack> tinyFreeLists = new AtomicReferenceArray<SyncChunkStack>(TINY_FREE_LIST_COUNT);
-    // Deadcoding the BIG stuff. Idea is to have a bigger TINY list by default
-//    /**
-//     * Every allocated chunk smaller than BIG_MULTIPLE*BIG_FREE_LIST_COUNT but that is not tiny will allocate a chunk of memory that is a multiple of this value.
-//     * Sizes are always rounded up to the next multiple of this constant
-//     * so internal fragmentation will be limited to BIG_MULTIPLE-1 bytes per allocation
-//     * and on average will be BIG_MULTIPLE/2 given a random distribution of size requests.
-//     */
-//    public final static int BIG_MULTIPLE = TINY_MULTIPLE*8;
-//    /**
-//     * Number of free lists to keep for big allocations.
-//     */
-//    private final static int BIG_FREE_LIST_COUNT = 2048;
-//    private final static int BIG_OFFSET = (MAX_TINY/BIG_MULTIPLE*BIG_MULTIPLE);
-//    public final static int MAX_BIG = (BIG_MULTIPLE*BIG_FREE_LIST_COUNT) + BIG_OFFSET;
-//    private final AtomicReferenceArray<ConcurrentChunkStack> bigFreeLists = new AtomicReferenceArray<ConcurrentChunkStack>(BIG_FREE_LIST_COUNT);
-    // hugeChunkSet is sorted by chunk size in ascending order. It will only contain chunks larger than MAX_TINY.
-    private final ConcurrentSkipListSet<Chunk> hugeChunkSet = new ConcurrentSkipListSet<Chunk>();
-    private final AtomicLong allocatedSize = new AtomicLong(0L);
-   
-    private int getNearestTinyMultiple(int size) {
-      return (size-1)/TINY_MULTIPLE;
-    }
-    public List<Chunk> getLiveChunks() {
-      ArrayList<Chunk> result = new ArrayList<Chunk>();
-      UnsafeMemoryChunk[] slabs = getSlabs();
-      for (int i=0; i < slabs.length; i++) {
-        getLiveChunks(slabs[i], result);
-      }
-      return result;
-    }
-    private void getLiveChunks(UnsafeMemoryChunk slab, List<Chunk> result) {
-      long addr = slab.getMemoryAddress();
-      while (addr <= (slab.getMemoryAddress() + slab.getSize() - Chunk.MIN_CHUNK_SIZE)) {
-        Fragment f = isAddrInFragmentFreeSpace(addr);
-        if (f != null) {
-          addr = f.getMemoryAddress() + f.getSize();
-        } else {
-          int curChunkSize = Chunk.getSize(addr);
-          int refCount = Chunk.getRefCount(addr);
-          if (refCount > 0) {
-            result.add(SimpleMemoryAllocatorImpl.this.chunkFactory.newChunk(addr));
-          }
-          addr += curChunkSize;
-        }
-      }
-    }
-    /**
-     * If addr is in the free space of a fragment then return that fragment; otherwise return null.
-     */
-    private Fragment isAddrInFragmentFreeSpace(long addr) {
-      for (Fragment f: this.fragmentList) {
-        if (addr >= (f.getMemoryAddress() + f.getFreeIndex()) && addr < (f.getMemoryAddress() + f.getSize())) {
-          return f;
-        }
-      }
-      return null;
-    }
-    public long getUsedMemory() {
-      return this.allocatedSize.get();
-    }
-    public long getFreeMemory() {
-      return getTotalMemory() - getUsedMemory();
-//      long result = getFreeFragmentMemory();
-//      result += getFreeTinyMemory();
-//      result += getFreeHugeMemory();
-//      return result;
-    }
-    public long getFreeFragmentMemory() {
-      long result = 0;
-      for (Fragment f: this.fragmentList) {
-        int freeSpace = f.freeSpace();
-        if (freeSpace >= Chunk.MIN_CHUNK_SIZE) {
-          result += freeSpace;
-        }
-      }
-      return result;
-    }
-    public long getFreeTinyMemory() {
-      long tinyFree = 0;
-      for (int i=0; i < this.tinyFreeLists.length(); i++) {
-        SyncChunkStack cl = this.tinyFreeLists.get(i);
-        if (cl != null) {
-          tinyFree += cl.computeTotalSize();
-        }
-      }
-      return tinyFree;
-    }
-//    public long getFreeBigMemory() {
-//      long bigFree = 0;
-//      for (int i=0; i < this.bigFreeLists.length(); i++) {
-//        ConcurrentChunkStack cl = this.bigFreeLists.get(i);
-//        if (cl != null) {
-//          bigFree += cl.computeTotalSize();
-//        }
-//      }
-//      return bigFree;
-//    }
-    public long getFreeHugeMemory() {
-      long hugeFree = 0;
-      for (Chunk c: this.hugeChunkSet) {
-        hugeFree += c.getSize();
-      }
-      return hugeFree;
-    }
-//    private int getNearestBigMultiple(int size) {
-//      return (size-1-BIG_OFFSET)/BIG_MULTIPLE;
-//    }
-
-    /**
-     * Each long in this array tells us how much of the corresponding slab is allocated.
-     */
-    //private final AtomicIntegerArray slabOffsets = new AtomicIntegerArray(getSlabs().length);
-    /**
-     * The slab id of the last slab we allocated from.
-     */
-    private final AtomicInteger lastFragmentAllocation = new AtomicInteger(0);
-
-    private final CopyOnWriteArrayList<Fragment> fragmentList;
-    public FreeListManager() {
-      UnsafeMemoryChunk[] slabs = getSlabs();
-      Fragment[] tmp = new Fragment[slabs.length];
-      for (int i=0; i < slabs.length; i++) {
-        tmp[i] = new Fragment(slabs[i].getMemoryAddress(), slabs[i].getSize());
-      }
-      this.fragmentList = new CopyOnWriteArrayList<Fragment>(tmp);
-      
-      if(validateMemoryWithFill) {
-        fillFragments();
-      }
-    }
-    
-    /**
-     * Fills all fragments with a fill used for data integrity validation.
-     */
-    private void fillFragments() {
-      for(Fragment fragment : this.fragmentList) {
-        fragment.fill();
-      }
-    }
-    
-    /**
-     * This is a bit of a hack. TODO add some timeout logic in case this thread never does another off heap allocation.
-     */
-//    private final ThreadLocal<Chunk> tlCache = new ThreadLocal<Chunk>();
-    
-    /**
-     * Allocate a chunk of memory of at least the given size.
-     * The basic algorithm is:
-     * 1. Look for a previously allocated and freed chunk close to the size requested.
-     * 2. See if the original chunk is big enough to split. If so do so.
-     * 3. Look for a previously allocated and freed chunk of any size larger than the one requested.
-     *    If we find one split it.
-     * <p>
-     * It might be better not to include step 3 since we expect and freed chunk to be reallocated in the future.
-     * Maybe it would be better for 3 to look for adjacent free blocks that can be merged together.
-     * For now we will just try 1 and 2 and then report out of mem.
-     * @param size minimum bytes the returned chunk must have.
-     * @param chunkType TODO
-     * @return the allocated chunk
-     * @throws IllegalStateException if a chunk can not be allocated.
-     */
-    @SuppressWarnings("synthetic-access")
-    public Chunk allocate(int size, ChunkType chunkType) {
-      Chunk result = null; /*tlCache.get();
-      
-      if (result != null && result.getDataSize() == size) {
-        tlCache.set(null);
-      } else */{
-        assert size > 0;
-        if (chunkType == null) {
-          chunkType = GemFireChunk.TYPE;
-        }
-        result = basicAllocate(size, true, chunkType);
-        result.setDataSize(size);
-      }
-      stats.incObjects(1);
-      int resultSize = result.getSize();
-      this.allocatedSize.addAndGet(resultSize);
-      stats.incUsedMemory(resultSize);
-      stats.incFreeMemory(-resultSize);
-      result.initializeUseCount();
-      notifyListeners();
-      
-      return result;
-    }
-    
-    private Chunk basicAllocate(int size, boolean useSlabs, ChunkType chunkType) {
-      if (useSlabs) {
-        // Every object stored off heap has a header so we need
-        // to adjust the size so that the header gets allocated.
-        // If useSlabs is false then the incoming size has already
-        // been adjusted.
-        size += Chunk.OFF_HEAP_HEADER_SIZE;
-      }
-      if (size <= MAX_TINY) {
-        return allocateTiny(size, useSlabs, chunkType);
-//      } else if (size <= MAX_BIG) {
-//        return allocateBig(size, useSlabs);
-      } else {
-        return allocateHuge(size, useSlabs, chunkType);
-      }
-    }
-    
-    private Chunk allocateFromFragments(int chunkSize, ChunkType chunkType) {
-      do {
-        final int lastAllocationId = this.lastFragmentAllocation.get();
-        for (int i=lastAllocationId; i < this.fragmentList.size(); i++) {
-          Chunk result = allocateFromFragment(i, chunkSize, chunkType);
-          if (result != null) {
-            return result;
-          }
-        }
-        for (int i=0; i < lastAllocationId; i++) {
-          Chunk result = allocateFromFragment(i, chunkSize, chunkType);
-          if (result != null) {
-            return result;
-          }
-        }
-      } while (compact(chunkSize));
-      // We tried all the fragments and didn't find any free memory.
-      logOffHeapState(chunkSize);
-      final OutOfOffHeapMemoryException failure = new OutOfOffHeapMemoryException("Out of off-heap memory. Could not allocate size of " + chunkSize);
-      try {
-        throw failure;
-      } finally {
-        SimpleMemoryAllocatorImpl.this.ooohml.outOfOffHeapMemory(failure);
-      }
-    }
-    
-    private void logOffHeapState(int chunkSize) {
-      if (InternalDistributedSystem.getAnyInstance() != null) {
-        LogWriter lw = InternalDistributedSystem.getAnyInstance().getLogWriter();
-        lw.info("OutOfOffHeapMemory allocating size of " + chunkSize + ". allocated=" + this.allocatedSize.get() + " compactions=" + this.compactCount.get() + " objects=" + stats.getObjects() + " free=" + stats.getFreeMemory() + " fragments=" + stats.getFragments() + " largestFragment=" + stats.getLargestFragment() + " fragmentation=" + stats.getFragmentation());
-        logFragmentState(lw);
-        logTinyState(lw);
-//        logBigState(lw);
-        logHugeState(lw);
-      }
-    }
-
-    private void logHugeState(LogWriter lw) {
-      for (Chunk c: this.hugeChunkSet) {
-        lw.info("Free huge of size " + c.getSize());
-      }
-    }
-//    private void logBigState(LogWriter lw) {
-//      for (int i=0; i < this.bigFreeLists.length(); i++) {
-//        ConcurrentChunkStack cl = this.bigFreeLists.get(i);
-//        if (cl != null) {
-//          cl.logSizes(lw, "Free big of size ");
-//        }
-//      }
-//    }
-    private void logTinyState(LogWriter lw) {
-      for (int i=0; i < this.tinyFreeLists.length(); i++) {
-        SyncChunkStack cl = this.tinyFreeLists.get(i);
-        if (cl != null) {
-          cl.logSizes(lw, "Free tiny of size ");
+  static void validateAddress(long addr) {
+    validateAddressAndSize(addr, -1);
+  }
+  
+  static void validateAddressAndSize(long addr, int size) {
+    // if the caller does not have a "size" to provide then use -1
+    if ((addr & 7) != 0) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("address was not 8 byte aligned: 0x").append(Long.toString(addr, 16));
+      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.singleton;
+      if (ma != null) {
+        sb.append(". Valid addresses must be in one of the following ranges: ");
+        for (int i=0; i < ma.slabs.length; i++) {
+          long startAddr = ma.slabs[i].getMemoryAddress();
+          long endAddr = startAddr + ma.slabs[i].getSize();
+          sb.append("[").append(Long.toString(startAddr, 16)).append("..").append(Long.toString(endAddr, 16)).append("] ");
         }
       }
+      throw new IllegalStateException(sb.toString());
     }
-    private void logFragmentState(LogWriter lw) {
-      for (Fragment f: this.fragmentList) {
-        int freeSpace = f.freeSpace();
-        if (freeSpace > 0) {
-          lw.info("Fragment at " + f.getMemoryAddress() + " of size " + f.getSize() + " has " + freeSpace + " bytes free.");
-        }
-      }
+    if (addr >= 0 && addr < 1024) {
+      throw new IllegalStateException("addr was smaller than expected 0x" + addr);
     }
+    validateAddressAndSizeWithinSlab(addr, size);
+  }
 
-    private final AtomicInteger compactCount = new AtomicInteger();
-    /**
-     * Compacts memory and returns true if enough memory to allocate chunkSize
-     * is freed. Otherwise returns false;
-     * TODO OFFHEAP: what should be done about contiguous chunks that end up being bigger than 2G?
-     * Currently if we are given slabs bigger than 2G or that just happen to be contiguous and add
-     * up to 2G then the compactor may unify them together into a single Chunk and our 32-bit chunkSize
-     * field will overflow. This code needs to detect this and just create a chunk of 2G and then start
-     * a new one.
-     * Or to prevent it from happening we could just check the incoming slabs and throw away a few bytes
-     * to keep them from being contiguous.
-     */
-    private boolean compact(int chunkSize) {
-      final long startCompactionTime = getStats().startCompaction();
-      final int countPreSync = this.compactCount.get();
-      try {
-        synchronized (this) {
-          if (this.compactCount.get() != countPreSync) {
-            // someone else did a compaction while we waited on the sync.
-            // So just return true causing the caller to retry the allocation.
-            return true;
-          }
-          ArrayList<SyncChunkStack> freeChunks = new ArrayList<SyncChunkStack>();
-          collectFreeChunks(freeChunks);
-          final int SORT_ARRAY_BLOCK_SIZE = 128;
-          long[] sorted = new long[SORT_ARRAY_BLOCK_SIZE];
-          int sortedSize = 0;
-          boolean result = false;
-          int largestFragment = 0;
-          for (SyncChunkStack l: freeChunks) {
-            long addr = l.poll();
-            while (addr != 0) {
-              int idx = Arrays.binarySearch(sorted, 0, sortedSize, addr);
-              //System.out.println("DEBUG addr=" + addr + " size=" + Chunk.getSize(addr) + " idx="+idx + " sortedSize=" + sortedSize);
-              if (idx >= 0) {
-                throw new IllegalStateException("duplicate memory address found during compaction!");
-              }
-              idx = -idx;
-              idx--;
-              if (idx == sortedSize) {
-                // addr is > everything in the array
-                if (sortedSize == 0) {
-                  // nothing was in the array
-                  sorted[0] = addr;
-                  sortedSize++;
-                } else {
-                  // see if we can conflate into sorted[idx]
-                  long lowAddr = sorted[idx-1];
-                  int lowSize = Chunk.getSize(lowAddr);
-                  if (lowAddr + lowSize == addr) {
-                    // append the addr chunk to lowAddr
-                    Chunk.setSize(lowAddr, lowSize + Chunk.getSize(addr));
-                  } else {
-                    if (sortedSize >= sorted.length) {
-                      long[] newSorted = new long[sorted.length+SORT_ARRAY_BLOCK_SIZE];
-                      System.arraycopy(sorted, 0, newSorted, 0, sorted.length);
-                      sorted = newSorted;
-                    }
-                    sortedSize++;
-                    sorted[idx] = addr;
-                  }
-                }
-              } else {
-                int addrSize = Chunk.getSize(addr);
-                long highAddr = sorted[idx];
-                if (addr + addrSize == highAddr) {
-                  // append highAddr chunk to addr
-                  Chunk.setSize(addr, addrSize + Chunk.getSize(highAddr));
-                  sorted[idx] = addr;
-                } else {
-                  boolean insert = idx==0;
-                  if (!insert) {
-                    long lowAddr = sorted[idx-1];
-  //                  if (lowAddr == 0L) {
-  //                    long[] tmp = Arrays.copyOf(sorted, sortedSize);
-  //                    throw new IllegalStateException("addr was zero at idx=" + (idx-1) + " sorted="+ Arrays.toString(tmp));
-  //                  }
-                    int lowSize = Chunk.getSize(lowAddr);
-                    if (lowAddr + lowSize == addr) {
-                      // append the addr chunk to lowAddr
-                      Chunk.setSize(lowAddr, lowSize + addrSize);
-                    } else {
-                      insert = true;
-                    }
-                  }
-                  if (insert) {
-                    if (sortedSize >= sorted.length) {
-                      long[] newSorted = new long[sorted.length+SORT_ARRAY_BLOCK_SIZE];
-                      System.arraycopy(sorted, 0, newSorted, 0, idx);
-                      newSorted[idx] = addr;
-                      System.arraycopy(sorted, idx, newSorted, idx+1, sortedSize-idx);
-                      sorted = newSorted;
-                    } else {
-                      System.arraycopy(sorted, idx, sorted, idx+1, sortedSize-idx);
-                      sorted[idx] = addr;
-                    }
-                    sortedSize++;
-                  }
-                }
+  static void validateAddressAndSizeWithinSlab(long addr, int size) {
+    if (DO_EXPENSIVE_VALIDATION) {
+      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.singleton;
+      if (ma != null) {
+        for (int i=0; i < ma.slabs.length; i++) {
+          if (ma.slabs[i].getMemoryAddress() <= addr && addr < (ma.slabs[i].getMemoryAddress() + ma.slabs[i].getSize())) {
+            // validate addr + size is within the same slab
+            if (size != -1) { // skip this check if size is -1
+              if (!(ma.slabs[i].getMemoryAddress() <= (addr+size-1) && (addr+size-1) < (ma.slabs[i].getMemoryAddress() + ma.slabs[i].getSize()))) {
+                throw new IllegalStateException(" address 0x" + Long.toString(addr+size-1, 16) + " does not address the original slab memory");
               }
-              addr = l.poll();
-            }
-          }
-          for (int i=sortedSize-1; i > 0; i--) {
-            long addr = sorted[i];
-            long lowAddr = sorted[i-1];
-            int lowSize = Chunk.getSize(lowAddr);
-            if (lowAddr + lowSize == addr) {
-              // append addr chunk to lowAddr
-              Chunk.setSize(lowAddr, lowSize + Chunk.getSize(addr));
-              sorted[i] = 0L;
-            }
-          }
-          this.lastFragmentAllocation.set(0);
-          ArrayList<Fragment> tmp = new ArrayList<Fragment>();
-          for (int i=sortedSize-1; i >= 0; i--) {
-            long addr = sorted[i];
-            if (addr == 0L) continue;
-            int addrSize = Chunk.getSize(addr);
-            Fragment f = new Fragment(addr, addrSize);
-            if (addrSize >= chunkSize) {
-              result = true;
-            }
-            if (addrSize > largestFragment) {
-              largestFragment = addrSize;
-              // TODO it might be better to sort them biggest first
-              tmp.add(0, f);
-            } else {
-              tmp.add(f);
-            }
-          }
-          this.fragmentList.addAll(tmp);
-          
-          // Reinitialize fragments with fill pattern data
-          if(validateMemoryWithFill) {
-            fillFragments();
-          }
-          
-          // Signal any waiters that a compaction happened.
-          this.compactCount.incrementAndGet();
-          
-          getStats().setLargestFragment(largestFragment);
-          getStats().setFragments(tmp.size());        
-          updateFragmentation();
-          
-          return result;
-        } // sync
-      } finally {
-        getStats().endCompaction(startCompactionTime);
-      }
-    }
-    
-    private void updateFragmentation() {      
-      long freeSize = getStats().getFreeMemory();
-
-      // Calculate free space fragmentation only if there is free space available.
-      if(freeSize > 0) {
-        long largestFragment = getStats().getLargestFragment();
-        long numerator = freeSize - largestFragment;
-        
-        double percentage = (double) numerator / (double) freeSize;
-        percentage *= 100d;
-        
-        int wholePercentage = (int) Math.rint(percentage);
-        getStats().setFragmentation(wholePercentage);
-      } else {
-        // No free space? Then we have no free space fragmentation.
-        getStats().setFragmentation(0);
-      }
-    }
-    
-    private void collectFreeChunks(List<SyncChunkStack> l) {
-      collectFreeFragmentChunks(l);
-      collectFreeHugeChunks(l);
-//      collectFreeBigChunks(l);
-      collectFreeTinyChunks(l);
-    }
-    private void collectFreeFragmentChunks(List<SyncChunkStack> l) {
-      if (this.fragmentList.size() == 0) return;
-      SyncChunkStack result = new SyncChunkStack();
-      for (Fragment f: this.fragmentList) {
-        int offset;
-        int diff;
-        do {
-          offset = f.getFreeIndex();
-          diff = f.getSize() - offset;
-        } while (diff >= Chunk.MIN_CHUNK_SIZE && !f.allocate(offset, offset+diff));
-        if (diff < Chunk.MIN_CHUNK_SIZE) {
-          if (diff > 0) {
-            logger.debug("Lost memory of size {}", diff);
-          }
-          // fragment is too small to turn into a chunk
-          // TODO we need to make sure this never happens
-          // by keeping sizes rounded. I think I did this
-          // by introducing MIN_CHUNK_SIZE and by rounding
-          // the size of huge allocations.
-          continue;
-        }
-        long chunkAddr = f.getMemoryAddress()+offset;
-        Chunk.setSize(chunkAddr, diff);
-        result.offer(chunkAddr);
-      }
-      // All the fragments have been turned in to chunks so now clear them
-      // The compaction will create new fragments.
-      this.fragmentList.clear();
-      if (!result.isEmpty()) {
-        l.add(result);
-      }
-    }
-    private void collectFreeTinyChunks(List<SyncChunkStack> l) {
-      for (int i=0; i < this.tinyFreeLists.length(); i++) {
-        SyncChunkStack cl = this.tinyFreeLists.get(i);
-        if (cl != null) {
-          long head = cl.clear();
-          if (head != 0L) {
-            l.add(new SyncChunkStack(head));
-          }
-        }
-      }
-    }
-//    private void collectFreeBigChunks(List<ConcurrentChunkStack> l) {
-//      for (int i=0; i < this.bigFreeLists.length(); i++) {
-//        ConcurrentChunkStack cl = this.bigFreeLists.get(i);
-//        if (cl != null) {
-//          long head = cl.clear();
-//          if (head != 0L) {
-//            l.add(new ConcurrentChunkStack(head));
-//          }
-//        }
-//      }
-//    }
-    public void collectFreeHugeChunks(List<SyncChunkStack> l) {
-      Chunk c = this.hugeChunkSet.pollFirst();
-      SyncChunkStack result = null;
-      while (c != null) {
-        if (result == null) {
-          result = new SyncChunkStack();
-          l.add(result);
-        }
-        result.offer(c.getMemoryAddress());
-        c = this.hugeChunkSet.pollFirst();
-      }
-    }
-    
-    private Chunk allocateFromFragment(final int fragIdx, final int chunkSize, ChunkType chunkType) {
-      if (fragIdx >= this.fragmentList.size()) return null;
-      final Fragment fragment;
-      try {
-        fragment = this.fragmentList.get(fragIdx);
-      } catch (IndexOutOfBoundsException ignore) {
-        // A concurrent compaction can cause this.
-        return null;
-      }
-      boolean retryFragment;
-      do {
-        retryFragment = false;
-        int oldOffset = fragment.getFreeIndex();
-        int fragmentSize = fragment.getSize();
-        int fragmentFreeSize = fragmentSize - oldOffset;
-        if (fragmentFreeSize >= chunkSize) {
-          // this fragment has room
-          // Try to allocate up to BATCH_SIZE more chunks from it
-          int allocSize = chunkSize * BATCH_SIZE;
-          if (allocSize > fragmentFreeSize) {
-            allocSize = (fragmentFreeSize / chunkSize) * chunkSize;
-          }
-          int newOffset = oldOffset + allocSize;
-          int extraSize = fragmentSize - newOffset;
-          if (extraSize < Chunk.MIN_CHUNK_SIZE) {
-            // include these last few bytes of the fragment in the allocation.
-            // If we don't then they will be lost forever.
-            // The extraSize bytes only apply to the first chunk we allocate (not the batch ones).
-            newOffset += extraSize;
-          } else {
-            extraSize = 0;
-          }
-          if (fragment.allocate(oldOffset, newOffset)) {
-            // We did the allocate!
-            this.lastFragmentAllocation.set(fragIdx);
-            Chunk result = chunkFactory.newChunk(fragment.getMemoryAddress()+oldOffset, chunkSize+extraSize, chunkType);
-            allocSize -= chunkSize+extraSize;
-            oldOffset += extraSize;
-            while (allocSize > 0) {
-              oldOffset += chunkSize;
-              // we add the batch ones immediately to the freelist
-              result.readyForFree();
-              free(result.getMemoryAddress(), false);
-              result = chunkFactory.newChunk(fragment.getMemoryAddress()+oldOffset, chunkSize, chunkType);
-              allocSize -= chunkSize;
-            }
-            
-            if(validateMemoryWithFill) {
-              result.validateFill();
             }
-            
-            return result;
-          } else {
-            // TODO OFFHEAP: if batch allocations are disabled should we not call basicAllocate here?
-            // Since we know another thread did a concurrent alloc
-            // that possibly did a batch check the free list again.
-            Chunk result = basicAllocate(chunkSize, false, chunkType);
-            if (result != null) {
-              return result;
-            }
-            retryFragment = true;
-          }
-        }
-      } while (retryFragment);
-      return null; // did not find enough free space in this fragment
-    }
-
-    private int round(int multiple, int value) {
-      return (int) ((((long)value + (multiple-1)) / multiple) * multiple);
-    }
-    private Chunk allocateTiny(int size, boolean useFragments, ChunkType chunkType) {
-      return basicAllocate(getNearestTinyMultiple(size), TINY_MULTIPLE, 0, this.tinyFreeLists, useFragments, chunkType);
-    }
-//    private Chunk allocateBig(int size, boolean useFragments) {
-//      return basicAllocate(getNearestBigMultiple(size), BIG_MULTIPLE, BIG_OFFSET, this.bigFreeLists, useFragments);
-//    }
-    private Chunk basicAllocate(int idx, int multiple, int offset, AtomicReferenceArray<SyncChunkStack> freeLists, boolean useFragments, ChunkType chunkType) {
-      SyncChunkStack clq = freeLists.get(idx);
-      if (clq != null) {
-        long memAddr = clq.poll();
-        if (memAddr != 0) {
-          Chunk result = SimpleMemoryAllocatorImpl.this.chunkFactory.newChunk(memAddr, chunkType);
-          
-          // Data integrity check.
-          if(validateMemoryWithFill) {          
-            result.validateFill();
-          }
-          
-          result.readyForAllocation(chunkType);
-          return result;
-        }
-      }
-      if (useFragments) {
-        return allocateFromFragments(((idx+1)*multiple)+offset, chunkType);
-      } else {
-        return null;
-      }
-    }
-    private Chunk allocateHuge(int size, boolean useFragments, ChunkType chunkType) {
-      // sizeHolder is a fake Chunk used to search our sorted hugeChunkSet.
-      Chunk sizeHolder = newFakeChunk(size);
-      NavigableSet<Chunk> ts = this.hugeChunkSet.tailSet(sizeHolder);
-      Chunk result = ts.pollFirst();
-      if (result != null) {
-        if (result.getSize() - (HUGE_MULTIPLE - Chunk.OFF_HEAP_HEADER_SIZE) < size) {
-          // close enough to the requested size; just return it.
-          
-          // Data integrity check.
-          if(validateMemoryWithFill) {          
-            result.validateFill();
-          }
-          if (chunkType.getSrcType() != Chunk.getSrcType(result.getMemoryAddress())) {
-            // The java wrapper class that was cached in the huge chunk list is the wrong type.
-            // So allocate a new one and garbage collect the old one.
-            result = SimpleMemoryAllocatorImpl.this.chunkFactory.newChunk(result.getMemoryAddress(), chunkType);
+            return;
           }
-          result.readyForAllocation(chunkType);
-          return result;
-        } else {
-          this.hugeChunkSet.add(result);
-        }
-      }
-      if (useFragments) {
-        // We round it up to the next multiple of TINY_MULTIPLE to make
-        // sure we always have chunks allocated on an 8 byte boundary.
-        return allocateFromFragments(round(TINY_MULTIPLE, size), chunkType);
-      } else {
-        return null;
-      }
-    }
-    
-    @SuppressWarnings("synthetic-access")
-    public void free(long addr) {
-      free(addr, true);
-    }
-    
-    private void free(long addr, boolean updateStats) {
-      int cSize = Chunk.getSize(addr);
-      if (updateStats) {
-        stats.incObjects(-1);
-        this.allocatedSize.addAndGet(-cSize);
-        stats.incUsedMemory(-cSize);
-        stats.incFreeMemory(cSize);
-        notifyListeners();
-      }
-      /*Chunk oldTlChunk = this.tlCache.get();
-      this.tlCache.set(c);
-      if (oldTlChunk != null) {
-        int oldTlcSize = oldTlChunk.getSize();
-        if (oldTlcSize <= MAX_TINY) {
-          freeTiny(oldTlChunk);
-        } else if (oldTlcSize <= MAX_BIG) {
-          freeBig(oldTlChunk);
-        } else {
-          freeHuge(oldTlChunk);
-        }
-      }*/
-      if (cSize <= MAX_TINY) {
-        freeTiny(addr, cSize);
-//      } else if (cSize <= MAX_BIG) {
-//        freeBig(addr, cSize);
-      } else {
-        freeHuge(addr, cSize);
-      }
-    }
-    private void freeTiny(long addr, int cSize) {
-      basicFree(addr, getNearestTinyMultiple(cSize), this.tinyFreeLists);
-    }
-//    private void freeBig(long addr, int cSize) {
-//      basicFree(addr, getNearestBigMultiple(cSize), this.bigFreeLists);
-//    }
-    private void basicFree(long addr, int idx, AtomicReferenceArray<SyncChunkStack> freeLists) {
-      SyncChunkStack clq = freeLists.get(idx);
-      if (clq != null) {
-        clq.offer(addr);
-      } else {
-        clq = new SyncChunkStack();
-        clq.offer(addr);
-        if (!freeLists.compareAndSet(idx, null, clq)) {
-          clq = freeLists.get(idx);
-          clq.offer(addr);
         }
+        throw new IllegalStateException(" address 0x" + Long.toString(addr, 16) + " does not address the original slab memory");
       }
-      
-    }
-    private void freeHuge(long addr, int cSize) {
-      this.hugeChunkSet.add(SimpleMemoryAllocatorImpl.this.chunkFactory.newChunk(addr)); // TODO make this a collection of longs
     }
   }
   
+  /** The inspection snapshot for MemoryInspector */
+  private List<MemoryBlock> memoryBlocks;
   
-  
-  
-  /*private Chunk newChunk(long addr, int chunkSize) {
-    return this.chunkFactory.newChunk(addr, chunkSize);
-  }*/
-  
-  private Chunk newFakeChunk(int chunkSize) {
-    return new FakeChunk(chunkSize);
+  @Override
+  public MemoryInspector getMemoryInspector() {
+    return this;
   }
   
-  
-  public static interface ChunkFactory  {
-    /**
-     * Create a new chunk of the given size and type at the given address.
-     */
-    Chunk newChunk(long address, int chunkSize, ChunkType chunkType);
-    /**
-     * Create a new chunk for a block of memory (identified by address)
-     * that has already been allocated.
-     * The size and type are derived from the existing object header.
-     */
-    Chunk newChunk(long address);
-    /**
-     * Create a new chunk of the given type for a block of memory (identified by address)
-     * that has already been allocated.
-     * The size is derived from the existing object header.
-     */
-    Chunk newChunk(long address, ChunkType chunkType);
-    /**
-     * Given the address of an existing chunk return its ChunkType.
-     */
-    ChunkType getChunkTypeForAddress(long address);
-    /**
-     * Given the rawBits from the object header of an existing chunk
-     * return its ChunkType.
-     */
-    ChunkType getChunkTypeForRawBits(int bits);
+  @Override
+  public synchronized void clearInspectionSnapshot() {
+    this.memoryBlocks = null;
   }
   
-  private static class GemFireChunkFactory implements ChunkFactory {
-    @Override
-    public Chunk newChunk(long address, int chunkSize, ChunkType chunkType) {
-      assert chunkType.equals(GemFireChunk.TYPE);
-      return new GemFireChunk(address,chunkSize);
-    }
-
-    @Override
-    public Chunk newChunk(long address) {
-      return new GemFireChunk(address);
+  @Override
+  public synchronized void createInspectionSnapshot() {
+    List<MemoryBlock> value = this.memoryBlocks;
+    if (value == null) {
+      value = getOrderedBlocks();
+      this.memoryBlocks = value;
     }
-
-    @Override
-    public Chunk newChunk(long address, ChunkType chunkType) {
-      assert chunkType.equals(GemFireChunk.TYPE);
-      return new GemFireChunk(address);
-    }
-
-    @Override
-    public ChunkType getChunkTypeForAddress(long address) {
-      assert Chunk.getSrcType(address) == Chunk.SRC_TYPE_GFE;
-      return GemFireChunk.TYPE;
-    }
-
-    @Override
-    public ChunkType getChunkTypeForRawBits(int bits) {
-      assert Chunk.getSrcTypeFromRawBits(bits) == Chunk.SRC_TYPE_GFE;
-      return GemFireChunk.TYPE;
-    }
-  }
-  
-  
-  /**
-   * Used to keep the heapForm around while an operation is still in progress.
-   * This allows the operation to access the serialized heap form instead of copying
-   * it from offheap. See bug 48135.
-   * 
-   * @author darrel
-   *
-   */
-  public static class ChunkWithHeapForm extends GemFireChunk {
-    private final byte[] heapForm;
-    
-    public ChunkWithHeapForm(GemFireChunk chunk, byte[] heapForm) {
-      super(chunk);
-      this.heapForm = heapForm;
-    }
-
-    @Override
-    protected byte[] getRawBytes() {
-      return this.heapForm;
-    }
-    
-    public Chunk getChunkWithoutHeapForm() {
-      return new GemFireChunk(this);
-    }
-  }
-  
-  public static abstract class ChunkType {
-    public abstract int getSrcType();
-    public abstract Chunk newChunk(long memoryAddress);
-    public abstract Chunk newChunk(long memoryAddress, int chunkSize);
-  }
-  
-  public static class GemFireChunkType extends ChunkType {
-    private static final GemFireChunkType singleton = new GemFireChunkType();
-    public static GemFireChunkType singleton() { return singleton; }
-    
-    private GemFireChunkType() {}
-
-    @Override
-    public int getSrcType() {
-      return Chunk.SRC_TYPE_GFE;
-    }
-
-    @Override
-    public Chunk newChunk(long memoryAddress) {      
-      return new GemFireChunk(memoryAddress);
-    }
-
-    @Override
-    public Chunk newChunk(long memoryAddress, int chunkSize) {     
-      return new GemFireChunk(memoryAddress, chunkSize);
-    }
-  }
-  public static class GemFireChunk extends Chunk {
-    public static final ChunkType TYPE = new ChunkType() {
-      @Override
-      public int getSrcType() {
-        return Chunk.SRC_TYPE_GFE;
-      }
-      @Override
-      public Chunk newChunk(long memoryAddress) {
-        return new GemFireChunk(memoryAddress);
-      }
-      @Override
-      public Chunk newChunk(long memoryAddress, int chunkSize) {
-        return new GemFireChunk(memoryAddress, chunkSize);
-      }
-    };
-    public GemFireChunk(long memoryAddress, int chunkSize) {
-      super(memoryAddress, chunkSize, TYPE);
-    }
-
-    public GemFireChunk(long memoryAddress) {
-      super(memoryAddress);
-      // chunkType may be set by caller when it calls readyForAllocation
-    }
-    public GemFireChunk(GemFireChunk chunk) {
-      super(chunk);
-    }
-    @Override
-    public Chunk slice(int position, int limit) {
-      return new GemFireChunkSlice(this, position, limit);
-    }
-  }
-  public static class GemFireChunkSlice extends GemFireChunk {
-    private final int offset;
-    private final int capacity;
-    public GemFireChunkSlice(GemFireChunk gemFireChunk, int position, int limit) {
-      super(gemFireChunk);
-      this.offset = gemFireChunk.getBaseDataOffset() + position;
-      this.capacity = limit - position;
-    }
-    @Override
-    public int getDataSize() {
-      return this.capacity;
-    }
-    
-    @Override
-    protected long getBaseDataAddress() {
-      return super.getBaseDataAddress() + this.offset;
-    }
-    @Override
-    protected int getBaseDataOffset() {
-      return this.offset;
-    }
-  }
-  /**
-   * Note: this class has a natural ordering that is inconsistent with equals.
-   * Instances of this class should have a short lifetime. We do not store references
-   * to it in the cache. Instead the memoryAddress is stored in a primitive field in
-   * the cache and if used it will then, if needed, create an instance of this class.
-   */
-  public static abstract class Chunk extends OffHeapCachedDeserializable implements Comparable<Chunk>, ConcurrentBag.Node, MemoryBlock {
-    /**
-     * The unsafe memory address of the first byte of this chunk
-     */
-    private final long memoryAddress;
-    
-    /**
-     * The useCount, chunkSize, dataSizeDelta, isSerialized, and isCompressed
-     * are all stored in off heap memory in a HEADER. This saves heap memory
-     * by using off heap.
-     */
-    public final static int OFF_HEAP_HEADER_SIZE = 4 + 4;
-    /**
-     * We need to smallest chunk to at least have enough room for a hdr
-     * and for an off heap ref (which is a long).
-     */
-    public final static int MIN_CHUNK_SIZE = OFF_HEAP_HEADER_SIZE + 8;
-    /**
-     * int field.
-     * The number of bytes in this chunk.
-     */
-    private final static int CHUNK_SIZE_OFFSET = 0;
-    /**
-     * Volatile int field
-     * The upper two bits are used for the isSerialized
-     * and isCompressed flags.
-     * The next three bits are used to encode the SRC_TYPE enum.
-     * The lower 3 bits of the most significant byte contains a magic number to help us detect
-     * if we are changing the ref count of an object that has been released.
-     * The next byte contains the dataSizeDelta.
-     * The number of bytes of logical data in this chunk.
-     * Since the number of bytes of logical data is always <= chunkSize
-     * and since chunkSize never changes, we have dataSize be
-     * a delta whose max value would be HUGE_MULTIPLE-1.
-     * The lower two bytes contains the use count.
-     */
-    private final static int REF_COUNT_OFFSET = 4;
-    /**
-     * The upper two bits are used for the isSerialized
-     * and isCompressed flags.
-     */
-    private final static int IS_SERIALIZED_BIT =    0x80000000;
-    private final static int IS_COMPRESSED_BIT =    0x40000000;
-    private final static int SRC_TYPE_MASK = 0x38000000;
-    private final static int SRC_TYPE_SHIFT = 16/*refCount*/+8/*dataSize*/+3/*magicSize*/;
-    private final static int MAGIC_MASK = 0x07000000;
-    private final static int MAGIC_NUMBER = 0x05000000;
-    private final static int DATA_SIZE_DELTA_MASK = 0x00ff0000;
-    private final static int DATA_SIZE_SHIFT = 16;
-    private final static int REF_COUNT_MASK =       0x0000ffff;
-    private final static int MAX_REF_COUNT = 0xFFFF;
-    final static long FILL_PATTERN = 0x3c3c3c3c3c3c3c3cL;
-    final static byte FILL_BYTE = 0x3c;
-    
-    // The 8 bits reserved for SRC_TYPE are basically no longer used.
-    // So we could free up these 8 bits for some other use or we could
-    // keep them for future extensions.
-    // If we ever want to allocate other "types" into a chunk of off-heap
-    // memory then the SRC_TYPE would be the way to go.
-    // For example we may want to allocate the memory for the off-heap
-    // RegionEntry in off-heap memory without it being of type GFE.
-    // When it is of type GFE then it either needs to be the bytes
-    // of a byte array or it needs to be a serialized java object.
-    // For the RegionEntry we may want all the primitive fields of
-    // the entry at certain offsets in the off-heap memory so we could
-    // access them directly in native byte format (i.e. no serialization).
-    // Note that for every SRC_TYPE we should have a ChunkType subclass.
-    public final static int SRC_TYPE_UNUSED0 = 0 << SRC_TYPE_SHIFT;
-    public final static int SRC_TYPE_UNUSED1 = 1 << SRC_TYPE_SHIFT;
-    public final static int SRC_TYPE_UNUSED2 = 2 << SRC_TYPE_SHIFT;
-    public final static int SRC_TYPE_UNUSED3 = 3 << SRC_TYPE_SHIFT;
-    public final static int SRC_TYPE_GFE = 4 << SRC_TYPE_SHIFT;
-    public final static int SRC_TYPE_UNUSED5 = 5 << SRC_TYPE_SHIFT;
-    public final static int SRC_TYPE_UNUSED6 = 6 << SRC_TYPE_SHIFT;
-    public final static int SRC_TYPE_UNUSED7 = 7 << SRC_TYPE_SHIFT;
-    
-    protected Chunk(long memoryAddress, int chunkSize, ChunkType chunkType) {
-      validateAddressAndSize(memoryAddress, chunkSize);
-      this.memoryAddress = memoryAddress;
-      setSize(chunkSize);
-      UnsafeMemoryChunk.writeAbsoluteIntVolatile(getMemoryAddress()+REF_COUNT_OFFSET, MAGIC_NUMBER|chunkType.getSrcType());
-    }
-    public void readyForFree() {
-      UnsafeMemoryChunk.writeAbsoluteIntVolatile(getMemoryAddress()+REF_COUNT_OFFSET, 0);
-    }
-    public void readyForAllocation(ChunkType chunkType) {
-      if (!UnsafeMemoryChunk.writeAbsoluteIntVolatile(getMemoryAddress()+REF_COUNT_OFFSET, 0, MAGIC_NUMBER|chunkType.getSrcType())) {
-        throw new IllegalStateException("Expected 0 but found " + Integer.toHexString(UnsafeMemoryChunk.readAbsoluteIntVolatile(getMemoryAddress()+REF_COUNT_OFFSET)));
-      }
-    }
-    /**
-     * Should only be used by FakeChunk subclass
-     */
-    protected Chunk() {
-      this.memoryAddress = 0L;
-    }
-    
-    /**
-     * Used to create a Chunk given an existing, already allocated,
-     * memoryAddress. The off heap header has already been initialized.
-     */
-    protected Chunk(long memoryAddress) {
-      validateAddress(memoryAddress);
-      this.memoryAddress = memoryAddress;
-    }
-    
-    protected Chunk(Chunk chunk) {
-      this.memoryAddress = chunk.memoryAddress;
-    }
-    
-    /**
-     * Throw an exception if this chunk is not allocated
-     */
-    public void checkIsAllocated() {
-      int originalBits = UnsafeMemoryChunk.readAbsoluteIntVolatile(this.memoryAddress+REF_COUNT_OFFSET);
-      if ((originalBits&MAGIC_MASK) != MAGIC_NUMBER) {
-        throw new IllegalStateException("It looks like this off heap memory was already freed. rawBits=" + Integer.toHexString(originalBits));
-      }
-    }
-    
-    public void incSize(int inc) {
-      setSize(getSize()+inc);
-    }
-    
-    protected void beforeReturningToAllocator() {
-      
-    }
-
-    @Override
-    public int getSize() {
-      return getSize(this.memoryAddress);
-    }
-
-    public void setSize(int size) {
-      setSize(this.memoryAddress, size);
-    }
-
-    public long getMemoryAddress() {
-      return this.memoryAddress;
-    }
-    
-    public int getDataSize() {
-      /*int dataSizeDelta = UnsafeMemoryChunk.readAbsoluteInt(this.memoryAddress+REF_COUNT_OFFSET);
-      dataSizeDelta &= DATA_SIZE_DELTA_MASK;
-      dataSizeDelta >>= DATA_SIZE_SHIFT;
-      return getSize() - dataSizeDelta;*/
-      return getDataSize(this.memoryAddress);
-    }
-    
-    protected static int getDataSize(long memoryAdress) {
-      int dataSizeDelta = UnsafeMemoryChunk.readAbsoluteInt(memoryAdress+REF_COUNT_OFFSET);
-      dataSizeDelta &= DATA_SIZE_DELTA_MASK;
-      dataSizeDelta >>= DATA_SIZE_SHIFT;
-      return getSize(memoryAdress) - dataSizeDelta;
-    }
-    
-    protected long getBaseDataAddress() {
-      return this.memoryAddress+OFF_HEAP_HEADER_SIZE;
-    }
-    protected int getBaseDataOffset() {
-      return 0;
-    }
-    
-    /**
-     * Creates and returns a direct ByteBuffer that contains the contents of this Chunk.
-     * Note that the returned ByteBuffer has a reference to this chunk's
-     * off-heap address so it can only be used while this Chunk is retained.
-     * @return the created direct byte buffer or null if it could not be created.
-     */
-    @Unretained
-    public ByteBuffer createDirectByteBuffer() {
-      return basicCreateDirectByteBuffer(getBaseDataAddress(), getDataSize());
-    }
-    @Override
-    public void sendTo(DataOutput out) throws IOException {
-      if (!this.isCompressed() && out instanceof HeapDataOutputStream) {
-        ByteBuffer bb = createDirectByteBuffer();
-        if (bb != null) {
-          HeapDataOutputStream hdos = (HeapDataOutputStream) out;
-          if (this.isSerialized()) {
-            hdos.write(bb);
-          } else {
-            hdos.writeByte(DSCODE.BYTE_ARRAY);
-            InternalDataSerializer.writeArrayLength(bb.remaining(), hdos);
-            hdos.write(bb);
-          }
-          return;
-        }
-      }
-      super.sendTo(out);
-    }
-    
-    @Override
-    public void sendAsByteArray(DataOutput out) throws IOException {
-      if (!isCompressed() && out instanceof HeapDataOutputStream) {
-        ByteBuffer bb = createDirectByteBuffer();
-        if (bb != null) {
-          HeapDataOutputStream hdos = (HeapDataOutputStream) out;
-          InternalDataSerializer.writeArrayLength(bb.remaining(), hdos);
-          hdos.write(bb);
-          return;
-        }
-      }
-      super.sendAsByteArray(out);
-    }
-       
-    private static volatile Class dbbClass = null;
-    private static volatile Constructor dbbCtor = null;
-    private static volatile boolean dbbCreateFailed = false;
-    
-    /**
-     * @return the created direct byte buffer or null if it could not be created.
-     */
-    private static ByteBuffer basicCreateDirectByteBuffer(long baseDataAddress, int dataSize) {
-      if (dbbCreateFailed) {
-        return null;
-      }
-      Constructor ctor = dbbCtor;
-      if (ctor == null) {
-        Class c = dbbClass;
-        if (c == null) {
-          try {
-            c = Class.forName("java.nio.DirectByteBuffer");
-          } catch (ClassNotFoundException e) {
-            //throw new IllegalStateException("Could not find java.nio.DirectByteBuffer", e);
-            dbbCreateFailed = true;
-            dbbAddressFailed = true;
-            return null;
-          }
-          dbbClass = c;
-        }
-        try {
-          ctor = c.getDeclaredConstructor(long.class, int.class);
-        } catch (NoSuchMethodException | SecurityException e) {
-          //throw new IllegalStateException("Could not get constructor DirectByteBuffer(long, int)", e);
-          dbbClass = null;
-          dbbCreateFailed = true;
-          return null;
-        }
-        ctor.setAccessible(true);
-        dbbCtor = ctor;
-      }
-      try {
-        return (ByteBuffer)ctor.newInstance(baseDataAddress, dataSize);
-      } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
-        //throw new IllegalStateException("Could not create an instance using DirectByteBuffer(long, int)", e);
-        dbbClass = null;
-        dbbCtor = null;
-        dbbCreateFailed = true;
-        return null;
-      }
-    }
-    private static volatile Method dbbAddressMethod = null;
-    private static volatile boolean dbbAddressFailed = false;
-    
-    /**
-     * Returns the address of the Unsafe memory for the first byte of a direct ByteBuffer.
-     * If the buffer is not direct or the address can not be obtained return 0.
-     */
-    public static long getDirectByteBufferAddress(ByteBuffer bb) {
-      if (!bb.isDirect()) {
-        return 0L;
-      }
-      if (dbbAddressFailed) {
-        return 0L;
-      }
-      Method m = dbbAddressMethod;
-      if (m == null) {
-        Class c = dbbClass;
-        if (c == null) {
-          try {
-            c = Class.forName("java.nio.DirectByteBuffer");
-          } catch (ClassNotFoundException e) {
-            //throw new IllegalStateException("Could not find java.nio.DirectByteBuffer", e);
-            dbbCreateFailed = true;
-            dbbAddressFailed = true;
-            return 0L;
-          }
-          dbbClass = c;
-        }
-        try {
-          m = c.getDeclaredMethod("address");
-        } catch (NoSuchMethodException | SecurityException e) {
-          //throw new IllegalStateException("Could not get method DirectByteBuffer.address()", e);
-          dbbClass = null;
-          dbbAddressFailed = true;
-          return 0L;
-        }
-        m.setAccessible(true);
-        dbbAddressMethod = m;
-      }
-      try {
-        return (Long)m.invoke(bb);
-      } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
-        //throw new IllegalStateException("Could not create an invoke DirectByteBuffer.address()", e);
-        dbbClass = null;
-        dbbAddressMethod = null;
-        dbbAddressFailed = true;
-        return 0L;
-      }
-    }
-    /**
-     * Returns an address that can be used with unsafe apis to access this chunks memory.
-     * @param offset the offset from this chunk's first byte of the byte the returned address should point to. Must be >= 0.
-     * @param size the number of bytes that will be read using the returned address. Assertion will use this to verify that all the memory accessed belongs to this chunk. Must be > 0.
-     * @return a memory address that can be used with unsafe apis
-     */
-    public long getUnsafeAddress(int offset, int size) {
-      assert offset >= 0 && offset + size <= getDataSize(): "Offset=" + offset + ",size=" + size + ",dataSize=" + getDataSize() + ", chunkSize=" + getSize() + ", but offset + size must be <= " + getDataSize();
-      assert size > 0;
-      long result = getBaseDataAddress() + offset;
-      // validateAddressAndSizeWithinSlab(result, size);
-      return result;
-    }
-    
-    @Override
-    public byte readByte(int offset) {
-      assert offset < getDataSize();
-      return UnsafeMemoryChunk.readAbsoluteByte(getBaseDataAddress() + offset);
-    }
-
-    @Override
-    public void writeByte(int offset, byte value) {
-      assert offset < getDataSize();
-      UnsafeMemoryChunk.writeAbsoluteByte(getBaseDataAddress() + offset, value);
-    }
-
-    @Override
-    public void readBytes(int offset, byte[] bytes) {
-      readBytes(offset, bytes, 0, bytes.length);
-    }
-
-    @Override
-    public void writeBytes(int offset, byte[] bytes) {
-      writeBytes(offset, bytes, 0, bytes.length);
-    }
-
-    public long getAddressForReading(int offset, int size) {
-      assert offset+size <= getDataSize();
-      return getBaseDataAddress() + offset;
-    }
-    
-    @Override
-    public void readBytes(int offset, byte[] bytes, int bytesOffset, int size) {
-      assert offset+size <= getDataSize();
-      UnsafeMemoryChunk.readAbsoluteBytes(getBaseDataAddress() + offset, bytes, bytesOffset, size);
-    }
-
-    @Override
-    public void writeBytes(int offset, byte[] bytes, int bytesOffset, int size) {
-      assert offset+size <= getDataSize();
-      validateAddressAndSizeWithinSlab(getBaseDataAddress() + offset, size);
-      UnsafeMemoryChunk.writeAbsoluteBytes(getBaseDataAddress() + offset, bytes, bytesOffset, size);
-    }
-    
-    @Override
-    public void release() {
-      release(this.memoryAddress, true);
-     }
-
-    @Override
-    public int compareTo(Chunk o) {
-      int result = Integer.signum(getSize() - o.getSize());
-      if (result == 0) {
-        // For the same sized chunks we really don't care about their order
-        // but we need compareTo to only return 0 if the two chunks are identical
-        result = Long.signum(getMemoryAddress() - o.getMemoryAddress());
-      }
-      return result;
-    }
-    
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof Chunk) {
-        return getMemoryAddress() == ((Chunk) o).getMemoryAddress();
-      }
-      return false;
-    }
-    
-    @Override
-    public int hashCode() {
-      long value = this.getMemoryAddress();
-      return (int)(value ^ (value >>> 32));
-    }
-
-    // OffHeapCachedDeserializable methods 
-    
-    @Override
-    public void setSerializedValue(byte[] value) {
-      writeBytes(0, value);
-    }
-    
-    public byte[] getDecompressedBytes(RegionEntryContext context) {
-      byte[] result = getCompressedBytes();
-      long time = context.getCachePerfStats().startDecompression();
-      result = context.getCompressor().decompress(result);
-      context.getCachePerfStats().endDecompression(time);      
-      return result;
-    }
-    
-    /**
-     * Returns the raw possibly compressed bytes of this chunk
-     */
-    public byte[] getCompressedBytes() {
-      byte[] result = new byte[getDataSize()];
-      readBytes(0, result);
-      //debugLog("reading", true);
-      getAllocator().getStats().incReads();
-      return result;
-    }
-    protected byte[] getRawBytes() {
-      byte[] result = getCompressedBytes();
-      // TODO OFFHEAP: change the following to assert !isCompressed();
-      if (isCompressed()) {
-        throw new UnsupportedOperationException();
-      }
-      return result;
-    }
-
-    @Override
-    public byte[] getSerializedValue() {
-      byte [] result = getRawBytes();
-      if (!isSerialized()) {
-        // The object is a byte[]. So we need to make it look like a serialized byte[] in our result
-        result = EntryEventImpl.serialize(result);
-      }
-      return result;
-    }
-    
-    @Override
-    public Object getDeserializedValue(Region r, RegionEntry re) {
-      if (isSerialized()) {
-        // TODO OFFHEAP: debug deserializeChunk
-        return EntryEventImpl.deserialize(getRawBytes());
-        //assert !isCompressed();
-        //return EntryEventImpl.deserializeChunk(this);
-      } else {
-        return getRawBytes();
-      }
-    }
-    
-    /**
-     * We want this to include memory overhead so use getSize() instead of getDataSize().
-     */
-    @Override
-    public int getSizeInBytes() {
-      // Calling getSize includes the off heap header size.
-      // We do not add anything to this since the size of the reference belongs to the region entry size
-      // not the size of this object.
-      return getSize();
-    }
-
-    @Override
-    public int getValueSizeInBytes() {
-      return getDataSize();
-    }
-
-    @Override
-    public void copyBytes(int src, int dst, int size) {
-      throw new UnsupportedOperationException("Implement if used");
-//      assert src+size <= getDataSize();
-//      assert dst+size < getDataSize();
-//      getSlabs()[this.getSlabIdx()].copyBytes(getBaseDataAddress()+src, getBaseDataAddress()+dst, size);
-    }
-
-    @Override
-    public boolean isSerialized() {
-      return (UnsafeMemoryChunk.readAbsoluteInt(this.memoryAddress+REF_COUNT_OFFSET) & IS_SERIALIZED_BIT) != 0;
-    }
-
-    @Override
-    public boolean isCompressed() {
-      return (UnsafeMemoryChunk.readAbsoluteInt(this.memoryAddress+REF_COUNT_OFFSET) & IS_COMPRESSED_BIT) != 0;
-    }
-
-    @Override
-    public boolean retain() {
-      return retain(this.memoryAddress);
-    }
-
-    @Override
-    public int getRefCount() {
-      return getRefCount(this.memoryAddress);
-    }
-
-    // By adding this one object ref to Chunk we are able to have free lists that only have memory overhead of a single objref per free item.
-    //private Node cbNodeNext;
-    @Override
-    public void setNextCBNode(Node next) {
-      throw new UnsupportedOperationException();
-      //this.cbNodeNext = next;
-    }
-
-    @Override
-    public Node getNextCBNode() {
-      throw new UnsupportedOperationException();
-      //return this.cbNodeNext;
-    }
-    public static int getSize(long memAddr) {
-      validateAddress(memAddr);
-      return UnsafeMemoryChunk.readAbsoluteInt(memAddr+CHUNK_SIZE_OFFSET);
-    }
-    public static void setSize(long memAddr, int size) {
-      validateAddressAndSize(memAddr, size);
-      UnsafeMemoryChunk.writeAbsoluteInt(memAddr+CHUNK_SIZE_OFFSET, size);
-    }
-    public static long getNext(long memAddr) {
-      validateAddress(memAddr);
-      return UnsafeMemoryChunk.readAbsoluteLong(memAddr+OFF_HEAP_HEADER_SIZE);
-    }
-    public static void setNext(long memAddr, long next) {
-      validateAddress(memAddr);
-      UnsafeMemoryChunk.writeAbsoluteLong(memAddr+OFF_HEAP_HEADER_SIZE, next);
-    }
-    @Override
-    public ChunkType getChunkType() {
-      return getAllocator().getChunkFactory().getChunkTypeForAddress(getMemoryAddress());
-    }
-    public static int getSrcTypeOrdinal(long memAddr) {
-      return getSrcType(memAddr) >> SRC_TYPE_SHIFT;
-    }
-    public static int getSrcType(long memAddr) {
-      return getSrcTypeFromRawBits(UnsafeMemoryChunk.readAbsoluteInt(memAddr+REF_COUNT_OFFSET));
-    }
-    public static int getSrcTypeFromRawBits(int rawBits) {
-      return rawBits & SRC_TYPE_MASK;
-    }
-    public static int getSrcTypeOrdinalFromRawBits(int rawBits) {
-      return getSrcTypeFromRawBits(rawBits) >> SRC_TYPE_SHIFT;
-    }
-    
-    /**
-     * Fills the chunk with a repeated byte fill pattern.
-     * @param baseAddress the starting address for a {@link Chunk}.
-     */
-    public static void fill(long baseAddress) {
-      long startAddress = baseAddress + MIN_CHUNK_SIZE;
-      int size = getSize(baseAddress) - MIN_CHUNK_SIZE;
-      
-      UnsafeMemoryChunk.fill(startAddress, size, FILL_BYTE);
-    }
-    
-    /**
-     * Validates that the fill pattern for this chunk has not been disturbed.  This method
-     * assumes the TINY_MULTIPLE is 8 bytes.
-     * @throws IllegalStateException when the pattern has been violated.
-     */
-    public void validateFill() {
-      assert TINY_MULTIPLE == 8;
-      
-      long startAddress = getMemoryAddress() + MIN_CHUNK_SIZE;
-      int size = getSize() - MIN_CHUNK_SIZE;
-      
-      for(int i = 0;i < size;i += TINY_MULTIPLE) {
-        if(UnsafeMemoryChunk.readAbsoluteLong(startAddress + i) != FILL_PATTERN) {
-          throw new IllegalStateException("Fill pattern violated for chunk " + getMemoryAddress() + " with size " + getSize());
-        }        
-      }
-    }
-
-    public void setSerialized(boolean isSerialized) {
-      if (isSerialized) {
-        int bits;
-        int originalBits;
-        do {
-          originalBits = UnsafeMemoryChunk.readAbsoluteIntVolatile(this.memoryAddress+REF_COUNT_OFFSET);
-          if ((originalBits&MAGIC_MASK) != MAGIC_NUMBER) {
-            throw new IllegalStateException("It looks like this off heap memory was already freed. rawBits=" + Integer.toHexString(originalBits));
-          }
-          bits = originalBits | IS_SERIALIZED_BIT;
-        } while (!UnsafeMemoryChunk.writeAbsoluteIntVolatile(this.memoryAddress+REF_COUNT_OFFSET, originalBits, bits));
-      }
-    }
-    public void setCompressed(boolean isCompressed) {
-      if (isCompressed) {
-        int bits;
-        int originalBits;
-        do {
-          originalBits = UnsafeMemoryChunk.readAbsoluteIntVolatile(this.memoryAddress+REF_COUNT_OFFSET);
-          if ((originalBits&MAGIC_MASK) != MAGIC_NUMBER) {
-            throw new IllegalStateException("It looks like this off heap memory was already freed. rawBits=" + Integer.toHexString(originalBits));
-          }
-          bits = originalBits | IS_COMPRESSED_BIT;
-        } while (!UnsafeMemoryChunk.writeAbsoluteIntVolatile(this.memoryAddress+REF_COUNT_OFFSET, originalBits, bits));
-      }
-    }
-    public void setDataSize(int dataSize) { // KIRK
-      assert dataSize <= getSize();
-      int delta = getSize() - dataSize;
-      assert delta <= (DATA_SIZE_DELTA_MASK >> DATA_SIZE_SHIFT);
-      delta <<= DATA_SIZE_SHIFT;
-      int bits;
-      int originalBits;
-      do {
-        originalBits = UnsafeMemoryChunk.readAbsoluteIntVolatile(this.memoryAddress+REF_COUNT_OFFSET);
-        if ((originalBits&MAGIC_MASK) != MAGIC_NUMBER) {
-          throw new IllegalStateException("It looks like this off heap memory was already freed. rawBits=" + Integer.toHexString(originalBits));
-        }
-        bits = originalBits;
-        bits &= ~DATA_SIZE_DELTA_MASK; // clear the old dataSizeDelta bits
-        bits |= delta; // set the dataSizeDelta bits to the new delta value
-      } while (!UnsafeMemoryChunk.writeAbsoluteIntVolatile(this.memoryAddress+REF_COUNT_OFFSET, originalBits, bits));
-    }
-    
-    public void initializeUseCount() {
-      int rawBits;
-      do {
-        rawBits = UnsafeMemoryChunk.readAbsoluteIntVolatile(this.memoryAddress+REF_COUNT_OFFSET);
-        if ((rawBits&MAGIC_MASK) != MAGIC_NUMBER) {
-          throw new IllegalStateException("It looks like this off heap memory was already freed. rawBits=" + Integer.toHexString(rawBits));
-        }
-        int uc = rawBits & REF_COUNT_MASK;
-        if (uc != 0) {
-          throw new IllegalStateException("Expected use count to be zero but it was: " + uc + " rawBits=0x" + Integer.toHexString(rawBits));
-        }
-      } while (!UnsafeMemoryChunk.writeAbsoluteIntVolatile(this.memoryAddress+REF_COUNT_OFFSET, rawBits, rawBits+1));
-    }
-
-    public static int getRefCount(long memAddr) {
-      return UnsafeMemoryChunk.readAbsoluteInt(memAddr+REF_COUNT_OFFSET) & REF_COUNT_MASK;
-    }
-
-    public static boolean retain(long memAddr) {
-      validateAddress(memAddr);
-      int uc;
-      int rawBits;
-      int retryCount = 0;
-      do {
-        rawBits = UnsafeMemoryChunk.readAbsoluteIntVolatile(memAddr+REF_COUNT_OFFSET);
-        if ((rawBits&MAGIC_MASK) != MAGIC_NUMBER) {
-          // same as uc == 0
-          // TODO MAGIC_NUMBER rethink its use and interaction with compactor fragments
-          return false;
-        }
-        uc = rawBits & REF_COUNT_MASK;
-        if (uc == MAX_REF_COUNT) {
-          throw new IllegalStateException("Maximum use count exceeded. rawBits=" + Integer.toHexString(rawBits));
-        } else if (uc == 0) {
-          return false;
-        }
-        retryCount++;
-        if (retryCount > 1000) {
-          throw new IllegalStateException("tried to write " + (rawBits+1) + " to @" + Long.toHexString(memAddr) + " 1,000 times.");
-        }
-      } while (!UnsafeMemoryChunk.writeAbsoluteIntVolatile(memAddr+REF_COUNT_OFFSET, rawBits, rawBits+1));
-      //debugLog("use inced ref count " + (uc+1) + " @" + Long.toHexString(memAddr), true);
-      if (trackReferenceCounts()) {
-        refCountChanged(memAddr, false, uc+1);
-      }
-
-      return true;
-    }
-    public static void release(final long memAddr, boolean issueOnReturnCallback) {
-      validateAddress(memAddr);
-      int newCount;
-      int rawBits;
-      boolean returnToAllocator;
-      do {
-        returnToAllocator = false;
-        rawBits = UnsafeMemoryChunk.readAbsoluteIntVolatile(memAddr+REF_COUNT_OFFSET);
-        if ((rawBits&MAGIC_MASK) != MAGIC_NUMBER) {
-          String msg = "It looks like off heap memory @" + Long.toHexString(memAddr) + " was already freed. rawBits=" + Integer.toHexString(rawBits) + " history=" + getFreeRefCountInfo(memAddr);
-          //debugLog(msg, true);
-          throw new IllegalStateException(msg);
-        }
-        int curCount = rawBits&REF_COUNT_MASK;
-        if ((curCount) == 0) {
-          //debugLog("too many frees @" + Long.toHexString(memAddr), true);
-          throw new IllegalStateException("Memory has already been freed." + " history=" + getFreeRefCountInfo(memAddr) /*+ System.identityHashCode(this)*/);
-        }
-        if (curCount == 1) {
-          newCount = 0; // clear the use count, bits, and the delta size since it will be freed.
-          returnToAllocator = true;
-        } else {
-          newCount = rawBits-1;
-        }
-      } while (!UnsafeMemoryChunk.writeAbsoluteIntVolatile(memAddr+REF_COUNT_OFFSET, rawBits, newCount));
-      //debugLog("free deced ref count " + (newCount&USE_COUNT_MASK) + " @" + Long.toHexString(memAddr), true);
-      if (returnToAllocator ) {
-        /*
-        if(issueOnReturnCallback) {
-         final GemFireCacheImpl.StaticSystemCallbacks sysCb =
-              GemFireCacheImpl.FactoryStatics.systemCallbacks;
-          if(sysCb != null ) {
-            ChunkType ct = SimpleMemoryAllocatorImpl.getAllocator().getChunkFactory().getChunkTypeForRawBits(rawBits);
-            int dataSizeDelta = computeDataSizeDelta(rawBits);
-            sysCb.beforeReturningOffHeapMemoryToAllocator(memAddr, ct, dataSizeDelta);
-          }
-        }
-        */
-       
-        if (trackReferenceCounts()) {
-          if (trackFreedReferenceCounts()) {
-            refCountChanged(memAddr, true, newCount&REF_COUNT_MASK);
-          }
-          freeRefCountInfo(memAddr);
-        }
-        
-        // Use fill pattern for free list data integrity check.
-        if(SimpleMemoryAllocatorImpl.getAllocator().validateMemoryWithFill) {
-          fill(memAddr);
-        }
-        
-        getAllocator().freeChunk(memAddr);
-      } else {
-        if (trackReferenceCounts()) {
-          refCountChanged(memAddr, true, newCount&REF_COUNT_MASK);
-        }
-      }
-    }
-    
-    private static int computeDataSizeDelta(int rawBits) {
-      int dataSizeDelta = rawBits;
-      dataSizeDelta &= DATA_SIZE_DELTA_MASK;
-      dataSizeDelta >>= DATA_SIZE_SHIFT;
-      return dataSizeDelta;
-    }
-    
-    @Override
-    public String toString() {
-      return toStringForOffHeapByteSource();
-      // This old impl is not safe because it calls getDeserializedForReading and we have code that call toString that does not inc the refcount.
-      // Also if this Chunk is compressed we don't know how to decompress it.
-      //return super.toString() + ":<dataSize=" + getDataSize() + " refCount=" + getRefCount() + " addr=" + getMemoryAddress() + " storedObject=" + getDeserializedForReading() + ">";
-    }
-    
-    protected String toStringForOffHeapByteSource() {
-      return super.toString() + ":<dataSize=" + getDataSize() + " refCount=" + getRefCount() + " addr=" + Long.toHexString(getMemoryAddress()) + ">";
-    }
-    
-    @Override
-    public State getState() {
-      if (getRefCount() > 0) {
-        return State.ALLOCATED;
-      } else {
-        return State.DEALLOCATED;
-      }
-    }
-    @Override
-    public MemoryBlock getNextBlock() {
-      throw new UnsupportedOperationException();
-    }
-    @Override
-    public int getBlockSize() {
-      return getSize();
-    }
-    @Override
-    public int getSlabId() {
-      throw new UnsupportedOperationException();
-    }
-    @Override
-    public int getFreeListId() {
-      return -1;
-    }
-    @Override
-    public String getDataType() {
-      return null;
-    }
-    @Override
-    public Object getDataValue() {
-      return null;
-    }
-    public Chunk slice(int position, int limit) {
-      throw new UnsupportedOperationException();
-    }
-  }
-  public static class FakeChunk extends Chunk {
-    private final int size;
-    public FakeChunk(int size) {
-      super();
-      this.size = size;
-    }
-    @Override
-    public int getSize() {
-      return this.size;
-    }
-  }
-  /**
-   * Simple stack structure.
-   * The chunk in the top of this stack is pointed to by topAddr.
-   * Each next chunk is found be reading a long from the data in the previous chunk.
-   * An address of 0L means it is then end of the stack.
-   * This class has a subtle race condition in it between
-   * one thread doing a poll, allocating data into the chunk returned by poll,
-   * and then offering it back. Meanwhile another thread did a poll of the same head chunk,
-   * read some of the allocating data as the "next" address and then did the compareAndSet
-   * call and it worked because the first thread had already put it back in.
-   * So this class should not be used. Instead use SyncChunkStack.
-   * 
-   * @author darrel
-   *
-   */
-  public static class BuggyConcurrentChunkStack {
-    // all uses of topAddr are done using topAddrUpdater
-    @SuppressWarnings("unused")
-    private volatile long topAddr;
-    private static final AtomicLongFieldUpdater<BuggyConcurrentChunkStack> topAddrUpdater = AtomicLongFieldUpdater.newUpdater(BuggyConcurrentChunkStack.class, "topAddr");
-    
-    public BuggyConcurrentChunkStack(long addr) {
-      if (addr != 0L) validateAddress(addr);
-      this.topAddr = addr;
-    }
-    public BuggyConcurrentChunkStack() {
-      this.topAddr = 0L;
-    }
-    public boolean isEmpty() {
-      return topAddrUpdater.get(this) == 0L;
-    }
-    public void offer(long e) {
-      assert e != 0;
-      validateAddress(e);
-      long curHead;
-      do {
-        curHead = topAddrUpdater.get(this);
-        Chunk.setNext(e, curHead);
-      } while (!topAddrUpdater.compareAndSet(this, curHead, e));
-    }
-    public long poll() {
-      long result;
-      long newHead;
-      do {
-        result = topAddrUpdater.get(this);
-        if (result == 0L) return 0L;
-        newHead = Chunk.getNext(result);
-        
-      } while (!topAddrUpdater.compareAndSet(this, result, newHead));
-      if (newHead != 0L) validateAddress(newHead);
-      return result;
-    }
-    /**
-     * Removes all the Chunks from this stack
-     * and returns the address of the first chunk.
-     * The caller owns all the Chunks after this call.
-     */
-    public long clear() {
-      long result;
-      do {
-        result = topAddrUpdater.get(this);
-        if (result == 0L) return 0L;
-      } while (!topAddrUpdater.compareAndSet(this, result, 0L));
-      return result;
-    }
-    public void logSizes(LogWriter lw, String msg) {
-      long headAddr = topAddrUpdater.get(this);
-      long addr;
-      boolean concurrentModDetected;
-      do {
-        concurrentModDetected = false;
-        addr = headAddr;
-        while (addr != 0L) {
-          int curSize = Chunk.getSize(addr);
-          addr = Chunk.getNext(addr);
-          long curHead = topAddrUpdater.get(this);
-          if (curHead != headAddr) {
-            headAddr = curHead;
-            concurrentModDetected = true;
-            // Someone added or removed from the stack.
-            // So we break out of the inner loop and start
-            // again at the new head.
-            break;
-          }
-          // TODO construct a single log msg
-          // that gets reset on the concurrent mad.
-          lw.info(msg + curSize);
-        }
-      } while (concurrentModDetected);
-    }
-    public long computeTotalSize() {
-      long result;
-      long headAddr = topAddrUpdater.get(this);
-      long addr;
-      boolean concurrentModDetected;
-      do {
-        concurrentModDetected = false;
-        result = 0;
-        addr = headAddr;
-        while (addr != 0L) {
-          result += Chunk.getSize(addr);
-          addr = Chunk.getNext(addr);
-          long curHead = topAddrUpdater.get(this);
-          if (curHead != headAddr) {
-            headAddr = curHead;
-            concurrentModDetected = true;
-            // Someone added or removed from the stack.
-            // So we break out of the inner loop and start
-            // again at the new head.
-            break;
-          }
-        }
-      } while (concurrentModDetected);
-      return result;
-    }
-  }
-  public static class SyncChunkStack {
-    // Ok to read without sync but must be synced on write
-    private volatile long topAddr;
-    
-    public SyncChunkStack(long addr) {
-      if (addr != 0L) validateAddress(addr);
-      this.topAddr = addr;
-    }
-    public SyncChunkStack() {
-      this.topAddr = 0L;
-    }
-    public boolean isEmpty() {
-      return this.topAddr == 0L;
-    }
-    public void offer(long e) {
-      assert e != 0;
-      validateAddress(e);
-      synchronized (this) {
-        Chunk.setNext(e, this.topAddr);
-        this.topAddr = e;
-      }
-    }
-    public long poll() {
-      long result;
-      synchronized (this) {
-        result = this.topAddr;
-        if (result != 0L) {
-          this.topAddr = Chunk.getNext(result);
-        }
-      }
-      return result;
-    }
-    /**
-     * Removes all the Chunks from this stack
-     * and returns the address of the first chunk.
-     * The caller owns all the Chunks after this call.
-     */
-    public long clear() {
-      long result;
-      synchronized (this) {
-        result = this.topAddr;
-        if (result != 0L) {
-          this.topAddr = 0L;
-        }
-      }
-      return result;
-    }
-    public void logSizes(LogWriter lw, String msg) {
-      long headAddr = this.topAddr;
-      long addr;
-      boolean concurrentModDetected;
-      do {
-        concurrentModDetected = false;
-        addr = headAddr;
-        while (addr != 0L) {
-          int curSize = Chunk.getSize(addr);
-          addr = Chunk.getNext(addr);
-          long curHead = this.topAddr;
-          if (curHead != headAddr) {
-            headAddr = curHead;
-            concurrentModDetected = true;
-            // Someone added or removed from the stack.
-            // So we break out of the inner loop and start
-            // again at the new head.
-            break;
-          }
-          // TODO construct a single log msg
-          // that gets reset on the concurrent mad.
-          lw.info(msg + curSize);
-        }
-      } while (concurrentModDetected);
-    }
-    public long computeTotalSize() {
-      long result;
-      long headAddr = this.topAddr;
-      long addr;
-      boolean concurrentModDetected;
-      do {
-        concurrentModDetected = false;
-        result = 0;
-        addr = headAddr;
-        while (addr != 0L) {
-          result += Chunk.getSize(addr);
-          addr = Chunk.getNext(addr);
-          long curHead = this.topAddr;
-          if (curHead != headAddr) {
-            headAddr = curHead;
-            concurrentModDetected = true;
-            // Someone added or removed from the stack.
-            // So we break out of the inner loop and start
-            // again at the new head.
-            break;
-          }
-        }
-      } while (concurrentModDetected);
-      return result;
-    }
-  }
-  
-  private static void validateAddress(long addr) {
-    validateAddressAndSize(addr, -1);
-  }
-  
-  private static void validateAddressAndSize(long addr, int size) {
-    // if the caller does not have a "size" to provide then use -1
-    if ((addr & 7) != 0) {
-      StringBuilder sb = new StringBuilder();
-      sb.append("address was not 8 byte aligned: 0x").append(Long.toString(addr, 16));
-      SimpleMem

<TRUNCATED>