You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/02/23 21:24:04 UTC

[68/94] [abbrv] incubator-geode git commit: GEODE-917: Merge branch 'feature/GEODE-917' into develop

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/FreeListManager.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/FreeListManager.java
index 0000000,10e4148..a716f14
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/FreeListManager.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/FreeListManager.java
@@@ -1,0 -1,821 +1,920 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.List;
+ import java.util.NavigableSet;
+ import java.util.concurrent.ConcurrentSkipListSet;
+ import java.util.concurrent.CopyOnWriteArrayList;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.atomic.AtomicReferenceArray;
+ 
+ import com.gemstone.gemfire.LogWriter;
+ import com.gemstone.gemfire.OutOfOffHeapMemoryException;
+ import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 -import com.gemstone.gemfire.internal.offheap.MemoryBlock.State;
+ 
+ /**
+  * Manages the free lists for a SimpleMemoryAllocatorImpl
+  */
+ public class FreeListManager {
 -  final private AtomicReferenceArray<SyncChunkStack> tinyFreeLists = new AtomicReferenceArray<SyncChunkStack>(SimpleMemoryAllocatorImpl.TINY_FREE_LIST_COUNT);
++  /** The MemoryChunks that this allocator is managing by allocating smaller chunks of them.
++   * The contents of this array never change.
++   */
++  private final AddressableMemoryChunk[] slabs;
++  private final long totalSlabSize;
++  
++  final private AtomicReferenceArray<SyncChunkStack> tinyFreeLists = new AtomicReferenceArray<SyncChunkStack>(TINY_FREE_LIST_COUNT);
+   // hugeChunkSet is sorted by chunk size in ascending order. It will only contain chunks larger than MAX_TINY.
 -  private final ConcurrentSkipListSet<Chunk> hugeChunkSet = new ConcurrentSkipListSet<Chunk>();
++  private final ConcurrentSkipListSet<ObjectChunk> hugeChunkSet = new ConcurrentSkipListSet<ObjectChunk>();
+   private final AtomicLong allocatedSize = new AtomicLong(0L);
+ 
+   private int getNearestTinyMultiple(int size) {
 -    return (size-1)/SimpleMemoryAllocatorImpl.TINY_MULTIPLE;
++    return (size-1)/TINY_MULTIPLE;
+   }
 -  List<Chunk> getLiveChunks() {
 -    ArrayList<Chunk> result = new ArrayList<Chunk>();
 -    UnsafeMemoryChunk[] slabs = this.ma.getSlabs();
++  List<ObjectChunk> getLiveChunks() {
++    ArrayList<ObjectChunk> result = new ArrayList<ObjectChunk>();
+     for (int i=0; i < slabs.length; i++) {
+       getLiveChunks(slabs[i], result);
+     }
+     return result;
+   }
 -  private void getLiveChunks(UnsafeMemoryChunk slab, List<Chunk> result) {
++  private void getLiveChunks(AddressableMemoryChunk slab, List<ObjectChunk> result) {
+     long addr = slab.getMemoryAddress();
 -    while (addr <= (slab.getMemoryAddress() + slab.getSize() - Chunk.MIN_CHUNK_SIZE)) {
++    while (addr <= (slab.getMemoryAddress() + slab.getSize() - ObjectChunk.MIN_CHUNK_SIZE)) {
+       Fragment f = isAddrInFragmentFreeSpace(addr);
+       if (f != null) {
+         addr = f.getMemoryAddress() + f.getSize();
+       } else {
 -        int curChunkSize = Chunk.getSize(addr);
 -        int refCount = Chunk.getRefCount(addr);
++        int curChunkSize = ObjectChunk.getSize(addr);
++        int refCount = ObjectChunk.getRefCount(addr);
+         if (refCount > 0) {
 -          result.add(this.ma.chunkFactory.newChunk(addr));
++          result.add(new ObjectChunk(addr));
+         }
+         addr += curChunkSize;
+       }
+     }
+   }
+   /**
+    * If addr is in the free space of a fragment then return that fragment; otherwise return null.
+    */
+   private Fragment isAddrInFragmentFreeSpace(long addr) {
+     for (Fragment f: this.fragmentList) {
+       if (addr >= (f.getMemoryAddress() + f.getFreeIndex()) && addr < (f.getMemoryAddress() + f.getSize())) {
+         return f;
+       }
+     }
+     return null;
+   }
+   public long getUsedMemory() {
+     return this.allocatedSize.get();
+   }
+   public long getFreeMemory() {
 -    return this.ma.getTotalMemory() - getUsedMemory();
++    return getTotalMemory() - getUsedMemory();
+   }
+   long getFreeFragmentMemory() {
+     long result = 0;
+     for (Fragment f: this.fragmentList) {
+       int freeSpace = f.freeSpace();
 -      if (freeSpace >= Chunk.MIN_CHUNK_SIZE) {
++      if (freeSpace >= ObjectChunk.MIN_CHUNK_SIZE) {
+         result += freeSpace;
+       }
+     }
+     return result;
+   }
+   long getFreeTinyMemory() {
+     long tinyFree = 0;
+     for (int i=0; i < this.tinyFreeLists.length(); i++) {
+       SyncChunkStack cl = this.tinyFreeLists.get(i);
+       if (cl != null) {
+         tinyFree += cl.computeTotalSize();
+       }
+     }
+     return tinyFree;
+   }
+   long getFreeHugeMemory() {
+     long hugeFree = 0;
 -    for (Chunk c: this.hugeChunkSet) {
++    for (ObjectChunk c: this.hugeChunkSet) {
+       hugeFree += c.getSize();
+     }
+     return hugeFree;
+   }
+ 
+   /**
+    * The id of the last fragment we allocated from.
+    */
+   private final AtomicInteger lastFragmentAllocation = new AtomicInteger(0);
+   private final CopyOnWriteArrayList<Fragment> fragmentList;
+   private final SimpleMemoryAllocatorImpl ma;
+ 
 -  public FreeListManager(SimpleMemoryAllocatorImpl ma) {
++  public FreeListManager(SimpleMemoryAllocatorImpl ma, final AddressableMemoryChunk[] slabs) {
+     this.ma = ma;
 -    UnsafeMemoryChunk[] slabs = ma.getSlabs();
++    this.slabs = slabs;
++    long total = 0;
+     Fragment[] tmp = new Fragment[slabs.length];
+     for (int i=0; i < slabs.length; i++) {
 -      tmp[i] = new Fragment(slabs[i].getMemoryAddress(), slabs[i].getSize());
++      tmp[i] = createFragment(slabs[i].getMemoryAddress(), slabs[i].getSize());
++      total += slabs[i].getSize();
+     }
+     this.fragmentList = new CopyOnWriteArrayList<Fragment>(tmp);
++    this.totalSlabSize = total;
+ 
 -    if(ma.validateMemoryWithFill) {
 -      fillFragments();
 -    }
++    fillFragments();
+   }
+ 
+   /**
 -   * Fills all fragments with a fill used for data integrity validation.
++   * Create and return a Fragment.
++   * This method exists so that tests can override it.
++   */
++  protected Fragment createFragment(long addr, int size) {
++    return new Fragment(addr, size);
++  }
++  
++  /**
++   * Fills all fragments with a fill used for data integrity validation 
++   * if fill validation is enabled.
+    */
+   private void fillFragments() {
++    if (!this.validateMemoryWithFill) {
++      return;
++    }
+     for(Fragment fragment : this.fragmentList) {
+       fragment.fill();
+     }
+   }
+ 
+   /**
+    * Allocate a chunk of memory of at least the given size.
+    * The basic algorithm is:
+    * 1. Look for a previously allocated and freed chunk close to the size requested.
+    * 2. See if the original chunk is big enough to split. If so do so.
+    * 3. Look for a previously allocated and freed chunk of any size larger than the one requested.
+    *    If we find one split it.
+    * <p>
+    * It might be better not to include step 3 since we expect and freed chunk to be reallocated in the future.
+    * Maybe it would be better for 3 to look for adjacent free blocks that can be merged together.
+    * For now we will just try 1 and 2 and then report out of mem.
+    * @param size minimum bytes the returned chunk must have.
 -   * @param chunkType TODO
+    * @return the allocated chunk
+    * @throws IllegalStateException if a chunk can not be allocated.
+    */
+   @SuppressWarnings("synthetic-access")
 -  public Chunk allocate(int size, ChunkType chunkType) {
 -    Chunk result = null;
 -    {
 -      assert size > 0;
 -      if (chunkType == null) {
 -        chunkType = GemFireChunk.TYPE;
 -      }
 -      result = basicAllocate(size, true, chunkType);
 -      result.setDataSize(size);
 -    }
 -    this.ma.stats.incObjects(1);
 -    int resultSize = result.getSize();
 -    this.allocatedSize.addAndGet(resultSize);
 -    this.ma.stats.incUsedMemory(resultSize);
 -    this.ma.stats.incFreeMemory(-resultSize);
++  public ObjectChunk allocate(int size) {
++    assert size > 0;
++    
++    ObjectChunk result = basicAllocate(size, true);
++
++    result.setDataSize(size);
++    this.allocatedSize.addAndGet(result.getSize());
+     result.initializeUseCount();
 -    this.ma.notifyListeners();
+ 
+     return result;
+   }
+ 
 -  private Chunk basicAllocate(int size, boolean useSlabs, ChunkType chunkType) {
++  private ObjectChunk basicAllocate(int size, boolean useSlabs) {
+     if (useSlabs) {
+       // Every object stored off heap has a header so we need
+       // to adjust the size so that the header gets allocated.
+       // If useSlabs is false then the incoming size has already
+       // been adjusted.
 -      size += Chunk.OFF_HEAP_HEADER_SIZE;
++      size += ObjectChunk.OFF_HEAP_HEADER_SIZE;
+     }
 -    if (size <= SimpleMemoryAllocatorImpl.MAX_TINY) {
 -      return allocateTiny(size, useSlabs, chunkType);
++    if (size <= MAX_TINY) {
++      return allocateTiny(size, useSlabs);
+     } else {
 -      return allocateHuge(size, useSlabs, chunkType);
++      return allocateHuge(size, useSlabs);
+     }
+   }
+ 
 -  private Chunk allocateFromFragments(int chunkSize, ChunkType chunkType) {
++  private ObjectChunk allocateFromFragments(int chunkSize) {
+     do {
+       final int lastAllocationId = this.lastFragmentAllocation.get();
+       for (int i=lastAllocationId; i < this.fragmentList.size(); i++) {
 -        Chunk result = allocateFromFragment(i, chunkSize, chunkType);
++        ObjectChunk result = allocateFromFragment(i, chunkSize);
+         if (result != null) {
+           return result;
+         }
+       }
+       for (int i=0; i < lastAllocationId; i++) {
 -        Chunk result = allocateFromFragment(i, chunkSize, chunkType);
++        ObjectChunk result = allocateFromFragment(i, chunkSize);
+         if (result != null) {
+           return result;
+         }
+       }
+     } while (compact(chunkSize));
+     // We tried all the fragments and didn't find any free memory.
+     logOffHeapState(chunkSize);
+     final OutOfOffHeapMemoryException failure = new OutOfOffHeapMemoryException("Out of off-heap memory. Could not allocate size of " + chunkSize);
+     try {
+       throw failure;
+     } finally {
 -      this.ma.ooohml.outOfOffHeapMemory(failure);
++      this.ma.getOutOfOffHeapMemoryListener().outOfOffHeapMemory(failure);
+     }
+   }
+ 
+   private void logOffHeapState(int chunkSize) {
+     if (InternalDistributedSystem.getAnyInstance() != null) {
+       LogWriter lw = InternalDistributedSystem.getAnyInstance().getLogWriter();
 -      lw.info("OutOfOffHeapMemory allocating size of " + chunkSize + ". allocated=" + this.allocatedSize.get() + " compactions=" + this.compactCount.get() + " objects=" + this.ma.stats.getObjects() + " free=" + this.ma.stats.getFreeMemory() + " fragments=" + this.ma.stats.getFragments() + " largestFragment=" + this.ma.stats.getLargestFragment() + " fragmentation=" + this.ma.stats.getFragmentation());
 -      logFragmentState(lw);
 -      logTinyState(lw);
 -      logHugeState(lw);
++      logOffHeapState(lw, chunkSize);
+     }
+   }
+ 
++  void logOffHeapState(LogWriter lw, int chunkSize) {
++    OffHeapMemoryStats stats = this.ma.getStats();
++    lw.info("OutOfOffHeapMemory allocating size of " + chunkSize + ". allocated=" + this.allocatedSize.get() + " compactions=" + this.compactCount.get() + " objects=" + stats.getObjects() + " free=" + stats.getFreeMemory() + " fragments=" + stats.getFragments() + " largestFragment=" + stats.getLargestFragment() + " fragmentation=" + stats.getFragmentation());
++    logFragmentState(lw);
++    logTinyState(lw);
++    logHugeState(lw);
++  }
++
+   private void logHugeState(LogWriter lw) {
 -    for (Chunk c: this.hugeChunkSet) {
++    for (ObjectChunk c: this.hugeChunkSet) {
+       lw.info("Free huge of size " + c.getSize());
+     }
+   }
+   private void logTinyState(LogWriter lw) {
+     for (int i=0; i < this.tinyFreeLists.length(); i++) {
+       SyncChunkStack cl = this.tinyFreeLists.get(i);
+       if (cl != null) {
+         cl.logSizes(lw, "Free tiny of size ");
+       }
+     }
+   }
+   private void logFragmentState(LogWriter lw) {
+     for (Fragment f: this.fragmentList) {
+       int freeSpace = f.freeSpace();
+       if (freeSpace > 0) {
+         lw.info("Fragment at " + f.getMemoryAddress() + " of size " + f.getSize() + " has " + freeSpace + " bytes free.");
+       }
+     }
+   }
+ 
 -  private final AtomicInteger compactCount = new AtomicInteger();
++  protected final AtomicInteger compactCount = new AtomicInteger();
++  /*
++   * Set this to "true" to perform data integrity checks on allocated and reused Chunks.  This may clobber 
++   * performance so turn on only when necessary.
++   */
++  final boolean validateMemoryWithFill = Boolean.getBoolean("gemfire.validateOffHeapWithFill");
++  /**
++   * Every allocated chunk smaller than TINY_MULTIPLE*TINY_FREE_LIST_COUNT will allocate a chunk of memory that is a multiple of this value.
++   * Sizes are always rounded up to the next multiple of this constant
++   * so internal fragmentation will be limited to TINY_MULTIPLE-1 bytes per allocation
++   * and on average will be TINY_MULTIPLE/2 given a random distribution of size requests.
++   * This does not account for the additional internal fragmentation caused by the off-heap header
++   * which currently is always 8 bytes.
++   */
++  public final static int TINY_MULTIPLE = Integer.getInteger("gemfire.OFF_HEAP_ALIGNMENT", 8);
++  static {
++    verifyOffHeapAlignment(TINY_MULTIPLE);
++  }
++  /**
++   * Number of free lists to keep for tiny allocations.
++   */
++  public final static int TINY_FREE_LIST_COUNT = Integer.getInteger("gemfire.OFF_HEAP_FREE_LIST_COUNT", 16384);
++  static {
++    verifyOffHeapFreeListCount(TINY_FREE_LIST_COUNT);
++  }
++  /**
++   * How many unused bytes are allowed in a huge memory allocation.
++   */
++  public final static int HUGE_MULTIPLE = 256;
++  static {
++    verifyHugeMultiple(HUGE_MULTIPLE);
++  }
++  public final static int MAX_TINY = TINY_MULTIPLE*TINY_FREE_LIST_COUNT;
+   /**
+    * Compacts memory and returns true if enough memory to allocate chunkSize
+    * is freed. Otherwise returns false;
+    * TODO OFFHEAP: what should be done about contiguous chunks that end up being bigger than 2G?
+    * Currently if we are given slabs bigger than 2G or that just happen to be contiguous and add
+    * up to 2G then the compactor may unify them together into a single Chunk and our 32-bit chunkSize
+    * field will overflow. This code needs to detect this and just create a chunk of 2G and then start
+    * a new one.
+    * Or to prevent it from happening we could just check the incoming slabs and throw away a few bytes
+    * to keep them from being contiguous.
+    */
 -  private boolean compact(int chunkSize) {
++  boolean compact(int chunkSize) {
+     final long startCompactionTime = this.ma.getStats().startCompaction();
+     final int countPreSync = this.compactCount.get();
++    afterCompactCountFetched();
+     try {
+       synchronized (this) {
+         if (this.compactCount.get() != countPreSync) {
+           // someone else did a compaction while we waited on the sync.
+           // So just return true causing the caller to retry the allocation.
+           return true;
+         }
+         ArrayList<SyncChunkStack> freeChunks = new ArrayList<SyncChunkStack>();
+         collectFreeChunks(freeChunks);
+         final int SORT_ARRAY_BLOCK_SIZE = 128;
+         long[] sorted = new long[SORT_ARRAY_BLOCK_SIZE];
+         int sortedSize = 0;
+         boolean result = false;
+         int largestFragment = 0;
+         for (SyncChunkStack l: freeChunks) {
+           long addr = l.poll();
+           while (addr != 0) {
+             int idx = Arrays.binarySearch(sorted, 0, sortedSize, addr);
 -            //System.out.println("DEBUG addr=" + addr + " size=" + Chunk.getSize(addr) + " idx="+idx + " sortedSize=" + sortedSize);
 -            if (idx >= 0) {
 -              throw new IllegalStateException("duplicate memory address found during compaction!");
 -            }
+             idx = -idx;
+             idx--;
+             if (idx == sortedSize) {
+               // addr is > everything in the array
+               if (sortedSize == 0) {
+                 // nothing was in the array
+                 sorted[0] = addr;
+                 sortedSize++;
+               } else {
+                 // see if we can conflate into sorted[idx]
+                 long lowAddr = sorted[idx-1];
 -                int lowSize = Chunk.getSize(lowAddr);
++                int lowSize = ObjectChunk.getSize(lowAddr);
+                 if (lowAddr + lowSize == addr) {
+                   // append the addr chunk to lowAddr
 -                  Chunk.setSize(lowAddr, lowSize + Chunk.getSize(addr));
++                  ObjectChunk.setSize(lowAddr, lowSize + ObjectChunk.getSize(addr));
+                 } else {
+                   if (sortedSize >= sorted.length) {
+                     long[] newSorted = new long[sorted.length+SORT_ARRAY_BLOCK_SIZE];
+                     System.arraycopy(sorted, 0, newSorted, 0, sorted.length);
+                     sorted = newSorted;
+                   }
+                   sortedSize++;
+                   sorted[idx] = addr;
+                 }
+               }
+             } else {
 -              int addrSize = Chunk.getSize(addr);
++              int addrSize = ObjectChunk.getSize(addr);
+               long highAddr = sorted[idx];
+               if (addr + addrSize == highAddr) {
+                 // append highAddr chunk to addr
 -                Chunk.setSize(addr, addrSize + Chunk.getSize(highAddr));
++                ObjectChunk.setSize(addr, addrSize + ObjectChunk.getSize(highAddr));
+                 sorted[idx] = addr;
+               } else {
+                 boolean insert = idx==0;
+                 if (!insert) {
+                   long lowAddr = sorted[idx-1];
+                   //                  if (lowAddr == 0L) {
+                   //                    long[] tmp = Arrays.copyOf(sorted, sortedSize);
+                   //                    throw new IllegalStateException("addr was zero at idx=" + (idx-1) + " sorted="+ Arrays.toString(tmp));
+                   //                  }
 -                  int lowSize = Chunk.getSize(lowAddr);
++                  int lowSize = ObjectChunk.getSize(lowAddr);
+                   if (lowAddr + lowSize == addr) {
+                     // append the addr chunk to lowAddr
 -                    Chunk.setSize(lowAddr, lowSize + addrSize);
++                    ObjectChunk.setSize(lowAddr, lowSize + addrSize);
+                   } else {
+                     insert = true;
+                   }
+                 }
+                 if (insert) {
+                   if (sortedSize >= sorted.length) {
+                     long[] newSorted = new long[sorted.length+SORT_ARRAY_BLOCK_SIZE];
+                     System.arraycopy(sorted, 0, newSorted, 0, idx);
+                     newSorted[idx] = addr;
+                     System.arraycopy(sorted, idx, newSorted, idx+1, sortedSize-idx);
+                     sorted = newSorted;
+                   } else {
+                     System.arraycopy(sorted, idx, sorted, idx+1, sortedSize-idx);
+                     sorted[idx] = addr;
+                   }
+                   sortedSize++;
+                 }
+               }
+             }
+             addr = l.poll();
+           }
+         }
+         for (int i=sortedSize-1; i > 0; i--) {
+           long addr = sorted[i];
+           long lowAddr = sorted[i-1];
 -          int lowSize = Chunk.getSize(lowAddr);
++          int lowSize = ObjectChunk.getSize(lowAddr);
+           if (lowAddr + lowSize == addr) {
+             // append addr chunk to lowAddr
 -            Chunk.setSize(lowAddr, lowSize + Chunk.getSize(addr));
++            ObjectChunk.setSize(lowAddr, lowSize + ObjectChunk.getSize(addr));
+             sorted[i] = 0L;
+           }
+         }
+         this.lastFragmentAllocation.set(0);
+         ArrayList<Fragment> tmp = new ArrayList<Fragment>();
+         for (int i=sortedSize-1; i >= 0; i--) {
+           long addr = sorted[i];
+           if (addr == 0L) continue;
 -          int addrSize = Chunk.getSize(addr);
 -          Fragment f = new Fragment(addr, addrSize);
++          int addrSize = ObjectChunk.getSize(addr);
++          Fragment f = createFragment(addr, addrSize);
+           if (addrSize >= chunkSize) {
+             result = true;
+           }
+           if (addrSize > largestFragment) {
+             largestFragment = addrSize;
+             // TODO it might be better to sort them biggest first
+             tmp.add(0, f);
+           } else {
+             tmp.add(f);
+           }
+         }
+         this.fragmentList.addAll(tmp);
+ 
 -        // Reinitialize fragments with fill pattern data
 -        if(this.ma.validateMemoryWithFill) {
 -          fillFragments();
 -        }
++        fillFragments();
+ 
+         // Signal any waiters that a compaction happened.
+         this.compactCount.incrementAndGet();
+ 
+         this.ma.getStats().setLargestFragment(largestFragment);
+         this.ma.getStats().setFragments(tmp.size());        
 -        updateFragmentation();
++        updateFragmentation(largestFragment);
+ 
+         return result;
+       } // sync
+     } finally {
+       this.ma.getStats().endCompaction(startCompactionTime);
+     }
+   }
+ 
 -  private void updateFragmentation() {      
 -    long freeSize = this.ma.getStats().getFreeMemory();
++  /**
++   * Unit tests override this method to get better test coverage
++   */
++  protected void afterCompactCountFetched() {
++  }
++  
++  static void verifyOffHeapAlignment(int tinyMultiple) {
++    if (tinyMultiple <= 0 || (tinyMultiple & 3) != 0) {
++      throw new IllegalStateException("gemfire.OFF_HEAP_ALIGNMENT must be a multiple of 8.");
++    }
++    if (tinyMultiple > 256) {
++      // this restriction exists because of the dataSize field in the object header.
++      throw new IllegalStateException("gemfire.OFF_HEAP_ALIGNMENT must be <= 256 and a multiple of 8.");
++    }
++  }
++  static void verifyOffHeapFreeListCount(int tinyFreeListCount) {
++    if (tinyFreeListCount <= 0) {
++      throw new IllegalStateException("gemfire.OFF_HEAP_FREE_LIST_COUNT must be >= 1.");
++    }
++  }
++  static void verifyHugeMultiple(int hugeMultiple) {
++    if (hugeMultiple > 256 || hugeMultiple < 0) {
++      // this restriction exists because of the dataSize field in the object header.
++      throw new IllegalStateException("HUGE_MULTIPLE must be >= 0 and <= 256 but it was " + hugeMultiple);
++    }
++  }
++  
++  private void updateFragmentation(long largestFragment) {      
++    long freeSize = getFreeMemory();
+ 
+     // Calculate free space fragmentation only if there is free space available.
+     if(freeSize > 0) {
 -      long largestFragment = this.ma.getStats().getLargestFragment();
+       long numerator = freeSize - largestFragment;
+ 
+       double percentage = (double) numerator / (double) freeSize;
+       percentage *= 100d;
+ 
+       int wholePercentage = (int) Math.rint(percentage);
+       this.ma.getStats().setFragmentation(wholePercentage);
+     } else {
+       // No free space? Then we have no free space fragmentation.
+       this.ma.getStats().setFragmentation(0);
+     }
+   }
+ 
+   private void collectFreeChunks(List<SyncChunkStack> l) {
+     collectFreeFragmentChunks(l);
+     collectFreeHugeChunks(l);
+     collectFreeTinyChunks(l);
+   }
++  List<Fragment> getFragmentList() {
++    return this.fragmentList;
++  }
+   private void collectFreeFragmentChunks(List<SyncChunkStack> l) {
+     if (this.fragmentList.size() == 0) return;
+     SyncChunkStack result = new SyncChunkStack();
+     for (Fragment f: this.fragmentList) {
+       int offset;
+       int diff;
+       do {
+         offset = f.getFreeIndex();
+         diff = f.getSize() - offset;
 -      } while (diff >= Chunk.MIN_CHUNK_SIZE && !f.allocate(offset, offset+diff));
 -      if (diff < Chunk.MIN_CHUNK_SIZE) {
 -        if (diff > 0) {
 -          SimpleMemoryAllocatorImpl.logger.debug("Lost memory of size {}", diff);
 -        }
 -        // fragment is too small to turn into a chunk
 -        // TODO we need to make sure this never happens
 -        // by keeping sizes rounded. I think I did this
 -        // by introducing MIN_CHUNK_SIZE and by rounding
 -        // the size of huge allocations.
++      } while (diff >= ObjectChunk.MIN_CHUNK_SIZE && !f.allocate(offset, offset+diff));
++      if (diff < ObjectChunk.MIN_CHUNK_SIZE) {
++        // If diff > 0 then that memory will be lost during compaction.
++        // This should never happen since we keep the sizes rounded
++        // based on MIN_CHUNK_SIZE.
++        assert diff == 0;
++        // The current fragment is completely allocated so just skip it.
+         continue;
+       }
+       long chunkAddr = f.getMemoryAddress()+offset;
 -      Chunk.setSize(chunkAddr, diff);
++      ObjectChunk.setSize(chunkAddr, diff);
+       result.offer(chunkAddr);
+     }
+     // All the fragments have been turned in to chunks so now clear them
+     // The compaction will create new fragments.
+     this.fragmentList.clear();
+     if (!result.isEmpty()) {
+       l.add(result);
+     }
+   }
+   private void collectFreeTinyChunks(List<SyncChunkStack> l) {
+     for (int i=0; i < this.tinyFreeLists.length(); i++) {
+       SyncChunkStack cl = this.tinyFreeLists.get(i);
+       if (cl != null) {
+         long head = cl.clear();
+         if (head != 0L) {
+           l.add(new SyncChunkStack(head));
+         }
+       }
+     }
+   }
+   private void collectFreeHugeChunks(List<SyncChunkStack> l) {
 -    Chunk c = this.hugeChunkSet.pollFirst();
++    ObjectChunk c = this.hugeChunkSet.pollFirst();
+     SyncChunkStack result = null;
+     while (c != null) {
+       if (result == null) {
+         result = new SyncChunkStack();
+         l.add(result);
+       }
+       result.offer(c.getMemoryAddress());
+       c = this.hugeChunkSet.pollFirst();
+     }
+   }
+ 
 -  private Chunk allocateFromFragment(final int fragIdx, final int chunkSize, ChunkType chunkType) {
++  ObjectChunk allocateFromFragment(final int fragIdx, final int chunkSize) {
+     if (fragIdx >= this.fragmentList.size()) return null;
+     final Fragment fragment;
+     try {
+       fragment = this.fragmentList.get(fragIdx);
+     } catch (IndexOutOfBoundsException ignore) {
+       // A concurrent compaction can cause this.
+       return null;
+     }
+     boolean retryFragment;
+     do {
+       retryFragment = false;
+       int oldOffset = fragment.getFreeIndex();
+       int fragmentSize = fragment.getSize();
+       int fragmentFreeSize = fragmentSize - oldOffset;
+       if (fragmentFreeSize >= chunkSize) {
+         // this fragment has room
 -        // Try to allocate up to BATCH_SIZE more chunks from it
 -        int allocSize = chunkSize * SimpleMemoryAllocatorImpl.BATCH_SIZE;
 -        if (allocSize > fragmentFreeSize) {
 -          allocSize = (fragmentFreeSize / chunkSize) * chunkSize;
 -        }
 -        int newOffset = oldOffset + allocSize;
++        int newOffset = oldOffset + chunkSize;
+         int extraSize = fragmentSize - newOffset;
 -        if (extraSize < Chunk.MIN_CHUNK_SIZE) {
++        if (extraSize < ObjectChunk.MIN_CHUNK_SIZE) {
+           // include these last few bytes of the fragment in the allocation.
+           // If we don't then they will be lost forever.
+           // The extraSize bytes only apply to the first chunk we allocate (not the batch ones).
+           newOffset += extraSize;
+         } else {
+           extraSize = 0;
+         }
+         if (fragment.allocate(oldOffset, newOffset)) {
+           // We did the allocate!
+           this.lastFragmentAllocation.set(fragIdx);
 -          Chunk result = this.ma.chunkFactory.newChunk(fragment.getMemoryAddress()+oldOffset, chunkSize+extraSize, chunkType);
 -          allocSize -= chunkSize+extraSize;
 -          oldOffset += extraSize;
 -          while (allocSize > 0) {
 -            oldOffset += chunkSize;
 -            // we add the batch ones immediately to the freelist
 -            result.readyForFree();
 -            free(result.getMemoryAddress(), false);
 -            result = this.ma.chunkFactory.newChunk(fragment.getMemoryAddress()+oldOffset, chunkSize, chunkType);
 -            allocSize -= chunkSize;
 -          }
 -
 -          if(this.ma.validateMemoryWithFill) {
 -            result.validateFill();
 -          }
 -
++          ObjectChunk result = new ObjectChunk(fragment.getMemoryAddress()+oldOffset, chunkSize+extraSize);
++          checkDataIntegrity(result);
+           return result;
+         } else {
 -          // TODO OFFHEAP: if batch allocations are disabled should we not call basicAllocate here?
 -          // Since we know another thread did a concurrent alloc
 -          // that possibly did a batch check the free list again.
 -          Chunk result = basicAllocate(chunkSize, false, chunkType);
++          ObjectChunk result = basicAllocate(chunkSize, false);
+           if (result != null) {
+             return result;
+           }
+           retryFragment = true;
+         }
+       }
+     } while (retryFragment);
+     return null; // did not find enough free space in this fragment
+   }
+ 
+   private int round(int multiple, int value) {
+     return (int) ((((long)value + (multiple-1)) / multiple) * multiple);
+   }
 -  private Chunk allocateTiny(int size, boolean useFragments, ChunkType chunkType) {
 -    return basicAllocate(getNearestTinyMultiple(size), SimpleMemoryAllocatorImpl.TINY_MULTIPLE, 0, this.tinyFreeLists, useFragments, chunkType);
++  private ObjectChunk allocateTiny(int size, boolean useFragments) {
++    return basicAllocate(getNearestTinyMultiple(size), TINY_MULTIPLE, 0, this.tinyFreeLists, useFragments);
+   }
 -  private Chunk basicAllocate(int idx, int multiple, int offset, AtomicReferenceArray<SyncChunkStack> freeLists, boolean useFragments, ChunkType chunkType) {
++  private ObjectChunk basicAllocate(int idx, int multiple, int offset, AtomicReferenceArray<SyncChunkStack> freeLists, boolean useFragments) {
+     SyncChunkStack clq = freeLists.get(idx);
+     if (clq != null) {
+       long memAddr = clq.poll();
+       if (memAddr != 0) {
 -        Chunk result = this.ma.chunkFactory.newChunk(memAddr, chunkType);
 -
 -        // Data integrity check.
 -        if(this.ma.validateMemoryWithFill) {          
 -          result.validateFill();
 -        }
 -
 -        result.readyForAllocation(chunkType);
++        ObjectChunk result = new ObjectChunk(memAddr);
++        checkDataIntegrity(result);
++        result.readyForAllocation();
+         return result;
+       }
+     }
+     if (useFragments) {
 -      return allocateFromFragments(((idx+1)*multiple)+offset, chunkType);
++      return allocateFromFragments(((idx+1)*multiple)+offset);
+     } else {
+       return null;
+     }
+   }
 -  private Chunk allocateHuge(int size, boolean useFragments, ChunkType chunkType) {
++  private ObjectChunk allocateHuge(int size, boolean useFragments) {
+     // sizeHolder is a fake Chunk used to search our sorted hugeChunkSet.
 -    Chunk sizeHolder = new FakeChunk(size);
 -    NavigableSet<Chunk> ts = this.hugeChunkSet.tailSet(sizeHolder);
 -    Chunk result = ts.pollFirst();
++    ObjectChunk sizeHolder = new FakeChunk(size);
++    NavigableSet<ObjectChunk> ts = this.hugeChunkSet.tailSet(sizeHolder);
++    ObjectChunk result = ts.pollFirst();
+     if (result != null) {
 -      if (result.getSize() - (SimpleMemoryAllocatorImpl.HUGE_MULTIPLE - Chunk.OFF_HEAP_HEADER_SIZE) < size) {
++      if (result.getSize() - (HUGE_MULTIPLE - ObjectChunk.OFF_HEAP_HEADER_SIZE) < size) {
+         // close enough to the requested size; just return it.
 -
 -        // Data integrity check.
 -        if(this.ma.validateMemoryWithFill) {          
 -          result.validateFill();
 -        }
 -        if (chunkType.getSrcType() != Chunk.getSrcType(result.getMemoryAddress())) {
 -          // The java wrapper class that was cached in the huge chunk list is the wrong type.
 -          // So allocate a new one and garbage collect the old one.
 -          result = this.ma.chunkFactory.newChunk(result.getMemoryAddress(), chunkType);
 -        }
 -        result.readyForAllocation(chunkType);
++        checkDataIntegrity(result);
++        result.readyForAllocation();
+         return result;
+       } else {
+         this.hugeChunkSet.add(result);
+       }
+     }
+     if (useFragments) {
+       // We round it up to the next multiple of TINY_MULTIPLE to make
+       // sure we always have chunks allocated on an 8 byte boundary.
 -      return allocateFromFragments(round(SimpleMemoryAllocatorImpl.TINY_MULTIPLE, size), chunkType);
++      return allocateFromFragments(round(TINY_MULTIPLE, size));
+     } else {
+       return null;
+     }
+   }
+   
++  private void checkDataIntegrity(ObjectChunk data) {
++    if (this.validateMemoryWithFill) {
++      data.validateFill();
++    }
++  }
+   /**
+    * Used by the FreeListManager to easily search its
+    * ConcurrentSkipListSet. This is not a real chunk
+    * but only used for searching.
+    */
 -  private static class FakeChunk extends Chunk {
++  private static class FakeChunk extends ObjectChunk {
+     private final int size;
+     public FakeChunk(int size) {
+       super();
+       this.size = size;
+     }
+     @Override
+     public int getSize() {
+       return this.size;
+     }
+   }
+ 
+   @SuppressWarnings("synthetic-access")
+   public void free(long addr) {
++    if (this.validateMemoryWithFill) {
++      ObjectChunk.fill(addr);
++    }
++    
+     free(addr, true);
+   }
+ 
+   private void free(long addr, boolean updateStats) {
 -    int cSize = Chunk.getSize(addr);
++    int cSize = ObjectChunk.getSize(addr);
+     if (updateStats) {
 -      this.ma.stats.incObjects(-1);
++      OffHeapMemoryStats stats = this.ma.getStats();
++      stats.incObjects(-1);
+       this.allocatedSize.addAndGet(-cSize);
 -      this.ma.stats.incUsedMemory(-cSize);
 -      this.ma.stats.incFreeMemory(cSize);
++      stats.incUsedMemory(-cSize);
++      stats.incFreeMemory(cSize);
+       this.ma.notifyListeners();
+     }
 -    if (cSize <= SimpleMemoryAllocatorImpl.MAX_TINY) {
++    if (cSize <= MAX_TINY) {
+       freeTiny(addr, cSize);
+     } else {
+       freeHuge(addr, cSize);
+     }
+   }
+   private void freeTiny(long addr, int cSize) {
+     basicFree(addr, getNearestTinyMultiple(cSize), this.tinyFreeLists);
+   }
+   private void basicFree(long addr, int idx, AtomicReferenceArray<SyncChunkStack> freeLists) {
+     SyncChunkStack clq = freeLists.get(idx);
+     if (clq != null) {
+       clq.offer(addr);
+     } else {
 -      clq = new SyncChunkStack();
++      clq = createFreeListForEmptySlot(freeLists, idx);
+       clq.offer(addr);
+       if (!freeLists.compareAndSet(idx, null, clq)) {
+         clq = freeLists.get(idx);
+         clq.offer(addr);
+       }
+     }
 -
+   }
++  /**
++   * Tests override this method to simulate concurrent modification
++   */
++  protected SyncChunkStack createFreeListForEmptySlot(AtomicReferenceArray<SyncChunkStack> freeLists, int idx) {
++    return new SyncChunkStack();
++  }
++  
+   private void freeHuge(long addr, int cSize) {
 -    this.hugeChunkSet.add(this.ma.chunkFactory.newChunk(addr)); // TODO make this a collection of longs
++    this.hugeChunkSet.add(new ObjectChunk(addr)); // TODO make this a collection of longs
+   }
+ 
+   List<MemoryBlock> getOrderedBlocks() {
+     final List<MemoryBlock> value = new ArrayList<MemoryBlock>();
+     addBlocksFromFragments(this.fragmentList, value); // unused fragments
+     addBlocksFromChunks(getLiveChunks(), value); // used chunks
+     addBlocksFromChunks(this.hugeChunkSet, value);    // huge free chunks
+     addMemoryBlocks(getTinyFreeBlocks(), value);           // tiny free chunks
+     Collections.sort(value, 
+         new Comparator<MemoryBlock>() {
+           @Override
+           public int compare(MemoryBlock o1, MemoryBlock o2) {
+             return Long.valueOf(o1.getMemoryAddress()).compareTo(o2.getMemoryAddress());
+           }
+     });
+     return value;
+   }
+   private void addBlocksFromFragments(Collection<Fragment> src, List<MemoryBlock> dest) {
+     for (MemoryBlock block : src) {
+       dest.add(new MemoryBlockNode(this.ma, block));
+     }
+   }
+   
 -  private void addBlocksFromChunks(Collection<Chunk> src, List<MemoryBlock> dest) {
 -    for (Chunk chunk : src) {
++  private void addBlocksFromChunks(Collection<ObjectChunk> src, List<MemoryBlock> dest) {
++    for (ObjectChunk chunk : src) {
+       dest.add(new MemoryBlockNode(this.ma, chunk));
+     }
+   }
+   
+   private void addMemoryBlocks(Collection<MemoryBlock> src, List<MemoryBlock> dest) {
+     for (MemoryBlock block : src) {
+       dest.add(new MemoryBlockNode(this.ma, block));
+     }
+   }
+   
+   private List<MemoryBlock> getTinyFreeBlocks() {
+     final List<MemoryBlock> value = new ArrayList<MemoryBlock>();
+     final SimpleMemoryAllocatorImpl sma = this.ma;
+     for (int i = 0; i < this.tinyFreeLists.length(); i++) {
+       if (this.tinyFreeLists.get(i) == null) continue;
+       long addr = this.tinyFreeLists.get(i).getTopAddress();
+       while (addr != 0L) {
+         value.add(new MemoryBlockNode(sma, new TinyMemoryBlock(addr, i)));
 -        addr = Chunk.getNext(addr);
++        addr = ObjectChunk.getNext(addr);
+       }
+     }
+     return value;
+   }
+   List<MemoryBlock> getAllocatedBlocks() {
+     final List<MemoryBlock> value = new ArrayList<MemoryBlock>();
+     addBlocksFromChunks(getLiveChunks(), value); // used chunks
+     Collections.sort(value, 
+         new Comparator<MemoryBlock>() {
+           @Override
+           public int compare(MemoryBlock o1, MemoryBlock o2) {
+             return Long.valueOf(o1.getMemoryAddress()).compareTo(o2.getMemoryAddress());
+           }
+     });
+     return value;
+   }
+   /**
+    * Used to represent an address from a tiny free list as a MemoryBlock
+    */
+   private static final class TinyMemoryBlock implements MemoryBlock {
+     private final long address;
+     private final int freeListId;
+ 
+     private TinyMemoryBlock(long address, int freeListId) {
+       this.address = address;
+       this.freeListId = freeListId;
+     }
+ 
+     @Override
+     public State getState() {
+       return State.DEALLOCATED;
+     }
+ 
+     @Override
+     public long getMemoryAddress() {
+       return address;
+     }
+ 
+     @Override
+     public int getBlockSize() {
 -      return Chunk.getSize(address);
++      return ObjectChunk.getSize(address);
+     }
+ 
+     @Override
+     public MemoryBlock getNextBlock() {
+       throw new UnsupportedOperationException();
+     }
+ 
+     @Override
+     public int getSlabId() {
+       throw new UnsupportedOperationException();
+     }
+ 
+     @Override
+     public int getFreeListId() {
+       return freeListId;
+     }
+ 
+     @Override
+     public int getRefCount() {
+       return 0;
+     }
+ 
+     @Override
+     public String getDataType() {
+       return "N/A";
+     }
+ 
+     @Override
+     public boolean isSerialized() {
+       return false;
+     }
+ 
+     @Override
+     public boolean isCompressed() {
+       return false;
+     }
+ 
+     @Override
+     public Object getDataValue() {
+       return null;
+     }
+ 
+     @Override
 -    public ChunkType getChunkType() {
 -      return null;
 -    }
 -    
 -    @Override
+     public boolean equals(Object o) {
+       if (o instanceof TinyMemoryBlock) {
+         return getMemoryAddress() == ((TinyMemoryBlock) o).getMemoryAddress();
+       }
+       return false;
+     }
+     
+     @Override
+     public int hashCode() {
+       long value = this.getMemoryAddress();
+       return (int)(value ^ (value >>> 32));
+     }
+   }
++
++  long getTotalMemory() {
++    return this.totalSlabSize;
++  }
++  
++  void freeSlabs() {
++    for (int i=0; i < slabs.length; i++) {
++      slabs[i].release();
++    }
++  }
++  /**
++   * newSlabs will be non-null in unit tests.
++   * If the unit test gave us a different array
++   * of slabs then something is wrong because we
++   * are trying to reuse the old already allocated
++   * array which means that the new one will never
++   * be used. Note that this code does not bother
++   * comparing the contents of the arrays.
++   */
++  boolean okToReuse(AddressableMemoryChunk[] newSlabs) {
++    return newSlabs == null || newSlabs == this.slabs;
++  }
++  
++  int getLargestSlabSize() {
++    return this.slabs[0].getSize();
++  }
++  int findSlab(long addr) {
++    for (int i=0; i < this.slabs.length; i++) {
++      AddressableMemoryChunk slab = this.slabs[i];
++      long slabAddr = slab.getMemoryAddress();
++      if (addr >= slabAddr) {
++        if (addr < slabAddr + slab.getSize()) {
++          return i;
++        }
++      }
++    }
++    throw new IllegalStateException("could not find a slab for addr " + addr);
++  }
++  void getSlabDescriptions(StringBuilder sb) {
++    for (int i=0; i < slabs.length; i++) {
++      long startAddr = slabs[i].getMemoryAddress();
++      long endAddr = startAddr + slabs[i].getSize();
++      sb.append("[").append(Long.toString(startAddr, 16)).append("..").append(Long.toString(endAddr, 16)).append("] ");
++    }
++  }
++  boolean validateAddressAndSizeWithinSlab(long addr, int size) {
++    for (int i=0; i < slabs.length; i++) {
++      if (slabs[i].getMemoryAddress() <= addr && addr < (slabs[i].getMemoryAddress() + slabs[i].getSize())) {
++        // validate addr + size is within the same slab
++        if (size != -1) { // skip this check if size is -1
++          if (!(slabs[i].getMemoryAddress() <= (addr+size-1) && (addr+size-1) < (slabs[i].getMemoryAddress() + slabs[i].getSize()))) {
++            throw new IllegalStateException(" address 0x" + Long.toString(addr+size-1, 16) + " does not address the original slab memory");
++          }
++        }
++        return true;
++      }
++    }
++    return false;
++  }
++  
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/MemoryAllocator.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/MemoryAllocator.java
index 0000000,0a014de..0c063ac
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/MemoryAllocator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/MemoryAllocator.java
@@@ -1,0 -1,64 +1,62 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ /**
+  * Basic contract for a heap that manages off heap memory. Any MemoryChunks allocated from a heap
+  * are returned to that heap when freed.
+  * 
+  * @author darrel
+  * @since 9.0
+  */
+ public interface MemoryAllocator {
+   /**
+    * @param size the size in bytes of the chunk of memory to allocate
 -   * @param chunkType TODO
+    * @return the allocated chunk of memory.
+    * @throws IllegalStateException if the heap does not have enough memory to grant the request
+    */
 -  public MemoryChunk allocate(int size, ChunkType chunkType);
++  public MemoryChunk allocate(int size);
+   
+   /**
+    * Allocates off heap memory for the given data and returns a MemoryChunk
+    * that is backed by this allocated memory and that contains the data.
+    * @param data the bytes of the data to put in the allocated CachedDeserializable
+    * @param isSerialized true if data contains a serialized object; false if it is an actual byte array.
+    * @param isCompressed true if data is compressed; false if it is uncompressed.
 -   * @param chunkType TODO
+    * @throws IllegalStateException if the heap does not have enough memory to grant the request
+    */
 -  public StoredObject allocateAndInitialize(byte[] data, boolean isSerialized, boolean isCompressed, ChunkType chunkType);
++  public StoredObject allocateAndInitialize(byte[] data, boolean isSerialized, boolean isCompressed);
+   
+   public long getFreeMemory();
+   
+   public long getUsedMemory();
+ 
+   public long getTotalMemory();
+ 
+   public OffHeapMemoryStats getStats();
+ 
+   /**
+    * This allocator will no longer be used so free up any system memory that belongs to it.
+    */
+   public void close();
+ 
+   public MemoryInspector getMemoryInspector();
+   
+   public void addMemoryUsageListener(MemoryUsageListener listener);
+   
+   public void removeMemoryUsageListener(MemoryUsageListener listener);
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/MemoryBlock.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/MemoryBlock.java
index 0000000,3ad9283..d8cb80a
mode 000000,100755..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/MemoryBlock.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/MemoryBlock.java
@@@ -1,0 -1,71 +1,70 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ 
+ /**
+  * Basic size and usage information about an off-heap memory block under
+  * inspection. For test validation only.
+  * 
+  * @author Kirk Lund
+  * @since 9.0
+  */
+ public interface MemoryBlock {
+ 
+   public enum State {
+     /** Unused fragment (not used and not in a free list) */
+     UNUSED, 
+     /** Allocated chunk currently in use */
+     ALLOCATED, 
+     /** Deallocated chunk currently in a free list */
+     DEALLOCATED 
+   }
+   
+   public State getState();
+   
+   /**
+    * Returns the unsafe memory address of the first byte of this block.
+    */
+   public long getMemoryAddress();
+   
+   /**
+    * Returns the size of this memory block in bytes.
+    */
+   public int getBlockSize();
+   
+   /**
+    * Returns the next memory block immediately after this one.
+    */
+   public MemoryBlock getNextBlock();
+   
+   /**
+    * Returns the identifier of which slab contains this block.
+    */
+   public int getSlabId();
+   
+   /**
+    * Returns the identifier of which free list contains this block.
+    */
+   public int getFreeListId();
+   
+   public int getRefCount();
+   public String getDataType();
 -  public ChunkType getChunkType();
+   public boolean isSerialized();
+   public boolean isCompressed();
+   public Object getDataValue();
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/MemoryBlockNode.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/MemoryBlockNode.java
index 0000000,5c6182a..b41d429
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/MemoryBlockNode.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/MemoryBlockNode.java
@@@ -1,0 -1,170 +1,162 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import java.io.IOException;
+ import java.util.Arrays;
+ 
+ import com.gemstone.gemfire.DataSerializer;
+ import com.gemstone.gemfire.cache.CacheClosedException;
+ 
+ /**
+  * Basic implementation of MemoryBlock for test validation only.
+  */
+ public class MemoryBlockNode implements MemoryBlock {
+   private final SimpleMemoryAllocatorImpl ma;
+   private final MemoryBlock block;
+   MemoryBlockNode(SimpleMemoryAllocatorImpl ma, MemoryBlock block) {
+     this.ma = ma;
+     this.block = block;
+   }
+   @Override
+   public State getState() {
+     return this.block.getState();
+   }
+   @Override
+   public long getMemoryAddress() {
+     return this.block.getMemoryAddress();
+   }
+   @Override
+   public int getBlockSize() {
+     return this.block.getBlockSize();
+   }
+   @Override
+   public MemoryBlock getNextBlock() {
+     return this.ma.getMemoryInspector().getBlockAfter(this);
+   }
+   public int getSlabId() {
+     return this.ma.findSlab(getMemoryAddress());
+   }
+   @Override
+   public int getFreeListId() {
+     return this.block.getFreeListId();
+   }
+   public int getRefCount() {
+     return this.block.getRefCount(); // delegate to fix GEODE-489
+   }
+   public String getDataType() {
+     if (this.block.getDataType() != null) {
+       return this.block.getDataType();
+     }
+     if (!isSerialized()) {
+       // byte array
+       if (isCompressed()) {
 -        return "compressed byte[" + ((Chunk)this.block).getDataSize() + "]";
++        return "compressed byte[" + ((ObjectChunk)this.block).getDataSize() + "]";
+       } else {
 -        return "byte[" + ((Chunk)this.block).getDataSize() + "]";
++        return "byte[" + ((ObjectChunk)this.block).getDataSize() + "]";
+       }
+     } else if (isCompressed()) {
 -      return "compressed object of size " + ((Chunk)this.block).getDataSize();
++      return "compressed object of size " + ((ObjectChunk)this.block).getDataSize();
+     }
+     //Object obj = EntryEventImpl.deserialize(((Chunk)this.block).getRawBytes());
 -    byte[] bytes = ((Chunk)this.block).getRawBytes();
++    byte[] bytes = ((ObjectChunk)this.block).getRawBytes();
+     return DataType.getDataType(bytes);
+   }
+   public boolean isSerialized() {
+     return this.block.isSerialized();
+   }
+   public boolean isCompressed() {
+     return this.block.isCompressed();
+   }
+   @Override
+   public Object getDataValue() {
+     String dataType = getDataType();
+     if (dataType == null || dataType.equals("N/A")) {
+       return null;
+     } else if (isCompressed()) {
 -      return ((Chunk)this.block).getCompressedBytes();
++      return ((ObjectChunk)this.block).getCompressedBytes();
+     } else if (!isSerialized()) {
+       // byte array
+       //return "byte[" + ((Chunk)this.block).getDataSize() + "]";
 -      return ((Chunk)this.block).getRawBytes();
++      return ((ObjectChunk)this.block).getRawBytes();
+     } else {
+       try {
 -        byte[] bytes = ((Chunk)this.block).getRawBytes();
++        byte[] bytes = ((ObjectChunk)this.block).getRawBytes();
+         return DataSerializer.readObject(DataType.getDataInput(bytes));
+       } catch (IOException e) {
+         e.printStackTrace();
+         return "IOException:" + e.getMessage();
+       } catch (ClassNotFoundException e) {
+         e.printStackTrace();
+         return "ClassNotFoundException:" + e.getMessage();
+       } catch (CacheClosedException e) {
+         e.printStackTrace();
+         return "CacheClosedException:" + e.getMessage();
+       }
+     }
+   }
+   @Override
+   public String toString() {
+     final StringBuilder sb = new StringBuilder(MemoryBlock.class.getSimpleName());
+     sb.append("{");
+     sb.append("MemoryAddress=").append(getMemoryAddress());
+     sb.append(", State=").append(getState());
+     sb.append(", BlockSize=").append(getBlockSize());
+     sb.append(", SlabId=").append(getSlabId());
+     sb.append(", FreeListId=");
+     if (getState() == State.UNUSED || getState() == State.ALLOCATED) {
+       sb.append("NONE");
+     } else if (getFreeListId() == -1) {
+       sb.append("HUGE");
+     } else {
+       sb.append(getFreeListId());
+     }
+     sb.append(", RefCount=").append(getRefCount());
 -    ChunkType ct = this.getChunkType();
 -    if (ct != null) {
 -      sb.append(", " + ct);
 -    }
+     sb.append(", isSerialized=").append(isSerialized());
+     sb.append(", isCompressed=").append(isCompressed());
+     sb.append(", DataType=").append(getDataType());
+     {
+       sb.append(", DataValue=");
+       Object dataValue = getDataValue();
+       if (dataValue instanceof byte[]) {
+         byte[] ba = (byte[]) dataValue;
+         if (ba.length < 1024) {
+           sb.append(Arrays.toString(ba));
+         } else {
+           sb.append("<byte array of length " + ba.length + ">");
+         }
+       } else {
+         sb.append(dataValue);
+       }
+     }
+     sb.append("}");
+     return sb.toString();
+   }
 -  @Override
 -  public ChunkType getChunkType() {
 -    return this.block.getChunkType();
 -  }
+   
+   @Override
+   public boolean equals(Object o) {
+     if (o instanceof MemoryBlockNode) {
+       o = ((MemoryBlockNode)o).block;
+     }
+     return this.block.equals(o);
+   }
+   
+   @Override
+   public int hashCode() {
+     return this.block.hashCode();
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/ObjectChunk.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/ObjectChunk.java
index 0000000,0000000..29e6956
new file mode 100644
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/ObjectChunk.java
@@@ -1,0 -1,0 +1,737 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *      http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package com.gemstone.gemfire.internal.offheap;
++
++import java.io.DataOutput;
++import java.io.IOException;
++import java.lang.reflect.Constructor;
++import java.lang.reflect.InvocationTargetException;
++import java.lang.reflect.Method;
++import java.nio.ByteBuffer;
++
++import com.gemstone.gemfire.cache.Region;
++import com.gemstone.gemfire.internal.DSCODE;
++import com.gemstone.gemfire.internal.HeapDataOutputStream;
++import com.gemstone.gemfire.internal.InternalDataSerializer;
++import com.gemstone.gemfire.internal.cache.EntryEventImpl;
++import com.gemstone.gemfire.internal.cache.RegionEntry;
++import com.gemstone.gemfire.internal.cache.RegionEntryContext;
++import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
++
++/**
++   * A chunk that stores a Java object.
++   * Currently the object stored in this chunk
++   * is always an entry value of a Region.
++   * Note: this class has a natural ordering that is inconsistent with equals.
++   * Instances of this class should have a short lifetime. We do not store references
++   * to it in the cache. Instead the memoryAddress is stored in a primitive field in
++   * the cache and if used it will then, if needed, create an instance of this class.
++   */
++  public class ObjectChunk extends OffHeapCachedDeserializable implements Comparable<ObjectChunk>, MemoryBlock {
++    /**
++     * The unsafe memory address of the first byte of this chunk
++     */
++    private final long memoryAddress;
++    
++    /**
++     * The useCount, chunkSize, dataSizeDelta, isSerialized, and isCompressed
++     * are all stored in off heap memory in a HEADER. This saves heap memory
++     * by using off heap.
++     */
++    public final static int OFF_HEAP_HEADER_SIZE = 4 + 4;
++    /**
++     * We need to smallest chunk to at least have enough room for a hdr
++     * and for an off heap ref (which is a long).
++     */
++    public final static int MIN_CHUNK_SIZE = OFF_HEAP_HEADER_SIZE + 8;
++    /**
++     * int field.
++     * The number of bytes in this chunk.
++     */
++    private final static int CHUNK_SIZE_OFFSET = 0;
++    /**
++     * Volatile int field
++     * The upper two bits are used for the isSerialized
++     * and isCompressed flags.
++     * The next three bits are unused.
++     * The lower 3 bits of the most significant byte contains a magic number to help us detect
++     * if we are changing the ref count of an object that has been released.
++     * The next byte contains the dataSizeDelta.
++     * The number of bytes of logical data in this chunk.
++     * Since the number of bytes of logical data is always <= chunkSize
++     * and since chunkSize never changes, we have dataSize be
++     * a delta whose max value would be HUGE_MULTIPLE-1.
++     * The lower two bytes contains the use count.
++     */
++    final static int REF_COUNT_OFFSET = 4;
++    /**
++     * The upper two bits are used for the isSerialized
++     * and isCompressed flags.
++     */
++    final static int IS_SERIALIZED_BIT =    0x80000000;
++    final static int IS_COMPRESSED_BIT =    0x40000000;
++    // UNUSED 0x38000000
++    final static int MAGIC_MASK = 0x07000000;
++    final static int MAGIC_NUMBER = 0x05000000;
++    final static int DATA_SIZE_DELTA_MASK = 0x00ff0000;
++    final static int DATA_SIZE_SHIFT = 16;
++    final static int REF_COUNT_MASK =       0x0000ffff;
++    final static int MAX_REF_COUNT = 0xFFFF;
++    final static long FILL_PATTERN = 0x3c3c3c3c3c3c3c3cL;
++    final static byte FILL_BYTE = 0x3c;
++    
++    protected ObjectChunk(long memoryAddress, int chunkSize) {
++      SimpleMemoryAllocatorImpl.validateAddressAndSize(memoryAddress, chunkSize);
++      this.memoryAddress = memoryAddress;
++      setSize(chunkSize);
++      UnsafeMemoryChunk.writeAbsoluteIntVolatile(getMemoryAddress()+REF_COUNT_OFFSET, MAGIC_NUMBER);
++    }
++    public void readyForFree() {
++      UnsafeMemoryChunk.writeAbsoluteIntVolatile(getMemoryAddress()+REF_COUNT_OFFSET, 0);
++    }
++    public void readyForAllocation() {
++      if (!UnsafeMemoryChunk.writeAbsoluteIntVolatile(getMemoryAddress()+REF_COUNT_OFFSET, 0, MAGIC_NUMBER)) {
++        throw new IllegalStateException("Expected 0 but found " + Integer.toHexString(UnsafeMemoryChunk.readAbsoluteIntVolatile(getMemoryAddress()+REF_COUNT_OFFSET)));
++      }
++    }
++    /**
++     * Should only be used by FakeChunk subclass
++     */
++    protected ObjectChunk() {
++      this.memoryAddress = 0L;
++    }
++    
++    /**
++     * Used to create a Chunk given an existing, already allocated,
++     * memoryAddress. The off heap header has already been initialized.
++     */
++    protected ObjectChunk(long memoryAddress) {
++      SimpleMemoryAllocatorImpl.validateAddress(memoryAddress);
++      this.memoryAddress = memoryAddress;
++    }
++    
++    protected ObjectChunk(ObjectChunk chunk) {
++      this.memoryAddress = chunk.memoryAddress;
++    }
++    
++    /**
++     * Throw an exception if this chunk is not allocated
++     */
++    public void checkIsAllocated() {
++      int originalBits = UnsafeMemoryChunk.readAbsoluteIntVolatile(this.memoryAddress+REF_COUNT_OFFSET);
++      if ((originalBits&MAGIC_MASK) != MAGIC_NUMBER) {
++        throw new IllegalStateException("It looks like this off heap memory was already freed. rawBits=" + Integer.toHexString(originalBits));
++      }
++    }
++    
++    public void incSize(int inc) {
++      setSize(getSize()+inc);
++    }
++    
++    protected void beforeReturningToAllocator() {
++      
++    }
++
++    @Override
++    public int getSize() {
++      return getSize(this.memoryAddress);
++    }
++
++    public void setSize(int size) {
++      setSize(this.memoryAddress, size);
++    }
++
++    public long getMemoryAddress() {
++      return this.memoryAddress;
++    }
++    
++    public int getDataSize() {
++      /*int dataSizeDelta = UnsafeMemoryChunk.readAbsoluteInt(this.memoryAddress+REF_COUNT_OFFSET);
++      dataSizeDelta &= DATA_SIZE_DELTA_MASK;
++      dataSizeDelta >>= DATA_SIZE_SHIFT;
++      return getSize() - dataSizeDelta;*/
++      return getDataSize(this.memoryAddress);
++    }
++    
++    protected static int getDataSize(long memoryAdress) {
++      int dataSizeDelta = UnsafeMemoryChunk.readAbsoluteInt(memoryAdress+REF_COUNT_OFFSET);
++      dataSizeDelta &= DATA_SIZE_DELTA_MASK;
++      dataSizeDelta >>= DATA_SIZE_SHIFT;
++      return getSize(memoryAdress) - dataSizeDelta;
++    }
++    
++    protected long getBaseDataAddress() {
++      return this.memoryAddress+OFF_HEAP_HEADER_SIZE;
++    }
++    protected int getBaseDataOffset() {
++      return 0;
++    }
++    
++    /**
++     * Creates and returns a direct ByteBuffer that contains the contents of this Chunk.
++     * Note that the returned ByteBuffer has a reference to this chunk's
++     * off-heap address so it can only be used while this Chunk is retained.
++     * @return the created direct byte buffer or null if it could not be created.
++     */
++    @Unretained
++    public ByteBuffer createDirectByteBuffer() {
++      return basicCreateDirectByteBuffer(getBaseDataAddress(), getDataSize());
++    }
++    @Override
++    public void sendTo(DataOutput out) throws IOException {
++      if (!this.isCompressed() && out instanceof HeapDataOutputStream) {
++        ByteBuffer bb = createDirectByteBuffer();
++        if (bb != null) {
++          HeapDataOutputStream hdos = (HeapDataOutputStream) out;
++          if (this.isSerialized()) {
++            hdos.write(bb);
++          } else {
++            hdos.writeByte(DSCODE.BYTE_ARRAY);
++            InternalDataSerializer.writeArrayLength(bb.remaining(), hdos);
++            hdos.write(bb);
++          }
++          return;
++        }
++      }
++      super.sendTo(out);
++    }
++    
++    @Override
++    public void sendAsByteArray(DataOutput out) throws IOException {
++      if (!isCompressed() && out instanceof HeapDataOutputStream) {
++        ByteBuffer bb = createDirectByteBuffer();
++        if (bb != null) {
++          HeapDataOutputStream hdos = (HeapDataOutputStream) out;
++          InternalDataSerializer.writeArrayLength(bb.remaining(), hdos);
++          hdos.write(bb);
++          return;
++        }
++      }
++      super.sendAsByteArray(out);
++    }
++       
++    private static volatile Class dbbClass = null;
++    private static volatile Constructor dbbCtor = null;
++    private static volatile boolean dbbCreateFailed = false;
++    
++    /**
++     * @return the created direct byte buffer or null if it could not be created.
++     */
++    private static ByteBuffer basicCreateDirectByteBuffer(long baseDataAddress, int dataSize) {
++      if (dbbCreateFailed) {
++        return null;
++      }
++      Constructor ctor = dbbCtor;
++      if (ctor == null) {
++        Class c = dbbClass;
++        if (c == null) {
++          try {
++            c = Class.forName("java.nio.DirectByteBuffer");
++          } catch (ClassNotFoundException e) {
++            //throw new IllegalStateException("Could not find java.nio.DirectByteBuffer", e);
++            dbbCreateFailed = true;
++            dbbAddressFailed = true;
++            return null;
++          }
++          dbbClass = c;
++        }
++        try {
++          ctor = c.getDeclaredConstructor(long.class, int.class);
++        } catch (NoSuchMethodException | SecurityException e) {
++          //throw new IllegalStateException("Could not get constructor DirectByteBuffer(long, int)", e);
++          dbbClass = null;
++          dbbCreateFailed = true;
++          return null;
++        }
++        ctor.setAccessible(true);
++        dbbCtor = ctor;
++      }
++      try {
++        return (ByteBuffer)ctor.newInstance(baseDataAddress, dataSize);
++      } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
++        //throw new IllegalStateException("Could not create an instance using DirectByteBuffer(long, int)", e);
++        dbbClass = null;
++        dbbCtor = null;
++        dbbCreateFailed = true;
++        return null;
++      }
++    }
++    private static volatile Method dbbAddressMethod = null;
++    private static volatile boolean dbbAddressFailed = false;
++    
++    /**
++     * Returns the address of the Unsafe memory for the first byte of a direct ByteBuffer.
++     * If the buffer is not direct or the address can not be obtained return 0.
++     */
++    public static long getDirectByteBufferAddress(ByteBuffer bb) {
++      if (!bb.isDirect()) {
++        return 0L;
++      }
++      if (dbbAddressFailed) {
++        return 0L;
++      }
++      Method m = dbbAddressMethod;
++      if (m == null) {
++        Class c = dbbClass;
++        if (c == null) {
++          try {
++            c = Class.forName("java.nio.DirectByteBuffer");
++          } catch (ClassNotFoundException e) {
++            //throw new IllegalStateException("Could not find java.nio.DirectByteBuffer", e);
++            dbbCreateFailed = true;
++            dbbAddressFailed = true;
++            return 0L;
++          }
++          dbbClass = c;
++        }
++        try {
++          m = c.getDeclaredMethod("address");
++        } catch (NoSuchMethodException | SecurityException e) {
++          //throw new IllegalStateException("Could not get method DirectByteBuffer.address()", e);
++          dbbClass = null;
++          dbbAddressFailed = true;
++          return 0L;
++        }
++        m.setAccessible(true);
++        dbbAddressMethod = m;
++      }
++      try {
++        return (Long)m.invoke(bb);
++      } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
++        //throw new IllegalStateException("Could not create an invoke DirectByteBuffer.address()", e);
++        dbbClass = null;
++        dbbAddressMethod = null;
++        dbbAddressFailed = true;
++        return 0L;
++      }
++    }
++    /**
++     * Returns an address that can be used with unsafe apis to access this chunks memory.
++     * @param offset the offset from this chunk's first byte of the byte the returned address should point to. Must be >= 0.
++     * @param size the number of bytes that will be read using the returned address. Assertion will use this to verify that all the memory accessed belongs to this chunk. Must be > 0.
++     * @return a memory address that can be used with unsafe apis
++     */
++    public long getUnsafeAddress(int offset, int size) {
++      assert offset >= 0 && offset + size <= getDataSize(): "Offset=" + offset + ",size=" + size + ",dataSize=" + getDataSize() + ", chunkSize=" + getSize() + ", but offset + size must be <= " + getDataSize();
++      assert size > 0;
++      long result = getBaseDataAddress() + offset;
++      // validateAddressAndSizeWithinSlab(result, size);
++      return result;
++    }
++    
++    @Override
++    public byte readByte(int offset) {
++      assert offset < getDataSize();
++      return UnsafeMemoryChunk.readAbsoluteByte(getBaseDataAddress() + offset);
++    }
++
++    @Override
++    public void writeByte(int offset, byte value) {
++      assert offset < getDataSize();
++      UnsafeMemoryChunk.writeAbsoluteByte(getBaseDataAddress() + offset, value);
++    }
++
++    @Override
++    public void readBytes(int offset, byte[] bytes) {
++      readBytes(offset, bytes, 0, bytes.length);
++    }
++
++    @Override
++    public void writeBytes(int offset, byte[] bytes) {
++      writeBytes(offset, bytes, 0, bytes.length);
++    }
++
++    public long getAddressForReading(int offset, int size) {
++      //delegate to getUnsafeAddress - as both the methods does return the memory address from given offset
++      return getUnsafeAddress(offset, size);
++    }
++    
++    @Override
++    public void readBytes(int offset, byte[] bytes, int bytesOffset, int size) {
++      assert offset+size <= getDataSize();
++      UnsafeMemoryChunk.readAbsoluteBytes(getBaseDataAddress() + offset, bytes, bytesOffset, size);
++    }
++
++    @Override
++    public void writeBytes(int offset, byte[] bytes, int bytesOffset, int size) {
++      assert offset+size <= getDataSize();
++      UnsafeMemoryChunk.writeAbsoluteBytes(getBaseDataAddress() + offset, bytes, bytesOffset, size);
++    }
++    
++    @Override
++    public void release() {
++      release(this.memoryAddress);
++     }
++
++    @Override
++    public int compareTo(ObjectChunk o) {
++      int result = Integer.signum(getSize() - o.getSize());
++      if (result == 0) {
++        // For the same sized chunks we really don't care about their order
++        // but we need compareTo to only return 0 if the two chunks are identical
++        result = Long.signum(getMemoryAddress() - o.getMemoryAddress());
++      }
++      return result;
++    }
++    
++    @Override
++    public boolean equals(Object o) {
++      if (o instanceof ObjectChunk) {
++        return getMemoryAddress() == ((ObjectChunk) o).getMemoryAddress();
++      }
++      return false;
++    }
++    
++    @Override
++    public int hashCode() {
++      long value = this.getMemoryAddress();
++      return (int)(value ^ (value >>> 32));
++    }
++
++    // OffHeapCachedDeserializable methods 
++    
++    @Override
++    public void setSerializedValue(byte[] value) {
++      writeBytes(0, value);
++    }
++    
++    public byte[] getDecompressedBytes(RegionEntryContext context) {
++      byte[] result = getCompressedBytes();
++      long time = context.getCachePerfStats().startDecompression();
++      result = context.getCompressor().decompress(result);
++      context.getCachePerfStats().endDecompression(time);      
++      return result;
++    }
++    
++    /**
++     * Returns the raw possibly compressed bytes of this chunk
++     */
++    public byte[] getCompressedBytes() {
++      byte[] result = new byte[getDataSize()];
++      readBytes(0, result);
++      //debugLog("reading", true);
++      SimpleMemoryAllocatorImpl.getAllocator().getStats().incReads();
++      return result;
++    }
++    protected byte[] getRawBytes() {
++      byte[] result = getCompressedBytes();
++      // TODO OFFHEAP: change the following to assert !isCompressed();
++      if (isCompressed()) {
++        throw new UnsupportedOperationException();
++      }
++      return result;
++    }
++
++    @Override
++    public byte[] getSerializedValue() {
++      byte [] result = getRawBytes();
++      if (!isSerialized()) {
++        // The object is a byte[]. So we need to make it look like a serialized byte[] in our result
++        result = EntryEventImpl.serialize(result);
++      }
++      return result;
++    }
++    
++    @Override
++    public Object getDeserializedValue(Region r, RegionEntry re) {
++      if (isSerialized()) {
++        // TODO OFFHEAP: debug deserializeChunk
++        return EntryEventImpl.deserialize(getRawBytes());
++        //assert !isCompressed();
++        //return EntryEventImpl.deserializeChunk(this);
++      } else {
++        return getRawBytes();
++      }
++    }
++    
++    /**
++     * We want this to include memory overhead so use getSize() instead of getDataSize().
++     */
++    @Override
++    public int getSizeInBytes() {
++      // Calling getSize includes the off heap header size.
++      // We do not add anything to this since the size of the reference belongs to the region entry size
++      // not the size of this object.
++      return getSize();
++    }
++
++    @Override
++    public int getValueSizeInBytes() {
++      return getDataSize();
++    }
++
++    @Override
++    public void copyBytes(int src, int dst, int size) {
++      throw new UnsupportedOperationException("Implement if used");
++//      assert src+size <= getDataSize();
++//      assert dst+size < getDataSize();
++//      getSlabs()[this.getSlabIdx()].copyBytes(getBaseDataAddress()+src, getBaseDataAddress()+dst, size);
++    }
++
++    @Override
++    public boolean isSerialized() {
++      return (UnsafeMemoryChunk.readAbsoluteInt(this.memoryAddress+REF_COUNT_OFFSET) & IS_SERIALIZED_BIT) != 0;
++    }
++
++    @Override
++    public boolean isCompressed() {
++      return (UnsafeMemoryChunk.readAbsoluteInt(this.memoryAddress+REF_COUNT_OFFSET) & IS_COMPRESSED_BIT) != 0;
++    }
++
++    @Override
++    public boolean retain() {
++      return retain(this.memoryAddress);
++    }
++
++    @Override
++    public int getRefCount() {
++      return getRefCount(this.memoryAddress);
++    }
++
++    public static int getSize(long memAddr) {
++      SimpleMemoryAllocatorImpl.validateAddress(memAddr);
++      return UnsafeMemoryChunk.readAbsoluteInt(memAddr+CHUNK_SIZE_OFFSET);
++    }
++    public static void setSize(long memAddr, int size) {
++      SimpleMemoryAllocatorImpl.validateAddressAndSize(memAddr, size);
++      UnsafeMemoryChunk.writeAbsoluteInt(memAddr+CHUNK_SIZE_OFFSET, size);
++    }
++    public static long getNext(long memAddr) {
++      SimpleMemoryAllocatorImpl.validateAddress(memAddr);
++      return UnsafeMemoryChunk.readAbsoluteLong(memAddr+OFF_HEAP_HEADER_SIZE);
++    }
++    public static void setNext(long memAddr, long next) {
++      SimpleMemoryAllocatorImpl.validateAddress(memAddr);
++      UnsafeMemoryChunk.writeAbsoluteLong(memAddr+OFF_HEAP_HEADER_SIZE, next);
++    }
++    
++    /**
++     * Fills the chunk with a repeated byte fill pattern.
++     * @param baseAddress the starting address for a {@link ObjectChunk}.
++     */
++    public static void fill(long baseAddress) {
++      long startAddress = baseAddress + MIN_CHUNK_SIZE;
++      int size = getSize(baseAddress) - MIN_CHUNK_SIZE;
++      
++      UnsafeMemoryChunk.fill(startAddress, size, FILL_BYTE);
++    }
++    
++    /**
++     * Validates that the fill pattern for this chunk has not been disturbed.  This method
++     * assumes the TINY_MULTIPLE is 8 bytes.
++     * @throws IllegalStateException when the pattern has been violated.
++     */
++    public void validateFill() {
++      assert FreeListManager.TINY_MULTIPLE == 8;
++      
++      long startAddress = getMemoryAddress() + MIN_CHUNK_SIZE;
++      int size = getSize() - MIN_CHUNK_SIZE;
++      
++      for(int i = 0;i < size;i += FreeListManager.TINY_MULTIPLE) {
++        if(UnsafeMemoryChunk.readAbsoluteLong(startAddress + i) != FILL_PATTERN) {
++          throw new IllegalStateException("Fill pattern violated for chunk " + getMemoryAddress() + " with size " + getSize());
++        }        
++      }
++    }
++
++    public void setSerialized(boolean isSerialized) {
++      if (isSerialized) {
++        int bits;
++        int originalBits;
++        do {
++          originalBits = UnsafeMemoryChunk.readAbsoluteIntVolatile(this.memoryAddress+REF_COUNT_OFFSET);
++          if ((originalBits&MAGIC_MASK) != MAGIC_NUMBER) {
++            throw new IllegalStateException("It looks like this off heap memory was already freed. rawBits=" + Integer.toHexString(originalBits));
++          }
++          bits = originalBits | IS_SERIALIZED_BIT;
++        } while (!UnsafeMemoryChunk.writeAbsoluteIntVolatile(this.memoryAddress+REF_COUNT_OFFSET, originalBits, bits));
++      }
++    }
++    public void setCompressed(boolean isCompressed) {
++      if (isCompressed) {
++        int bits;
++        int originalBits;
++        do {
++          originalBits = UnsafeMemoryChunk.readAbsoluteIntVolatile(this.memoryAddress+REF_COUNT_OFFSET);
++          if ((originalBits&MAGIC_MASK) != MAGIC_NUMBER) {
++            throw new IllegalStateException("It looks like this off heap memory was already freed. rawBits=" + Integer.toHexString(originalBits));
++          }
++          bits = originalBits | IS_COMPRESSED_BIT;
++        } while (!UnsafeMemoryChunk.writeAbsoluteIntVolatile(this.memoryAddress+REF_COUNT_OFFSET, originalBits, bits));
++      }
++    }
++    public void setDataSize(int dataSize) { // KIRK
++      assert dataSize <= getSize();
++      int delta = getSize() - dataSize;
++      assert delta <= (DATA_SIZE_DELTA_MASK >> DATA_SIZE_SHIFT);
++      delta <<= DATA_SIZE_SHIFT;
++      int bits;
++      int originalBits;
++      do {
++        originalBits = UnsafeMemoryChunk.readAbsoluteIntVolatile(this.memoryAddress+REF_COUNT_OFFSET);
++        if ((originalBits&MAGIC_MASK) != MAGIC_NUMBER) {
++          throw new IllegalStateException("It looks like this off heap memory was already freed. rawBits=" + Integer.toHexString(originalBits));
++        }
++        bits = originalBits;
++        bits &= ~DATA_SIZE_DELTA_MASK; // clear the old dataSizeDelta bits
++        bits |= delta; // set the dataSizeDelta bits to the new delta value
++      } while (!UnsafeMemoryChunk.writeAbsoluteIntVolatile(this.memoryAddress+REF_COUNT_OFFSET, originalBits, bits));
++    }
++    
++    public void initializeUseCount() {
++      int rawBits;
++      do {
++        rawBits = UnsafeMemoryChunk.readAbsoluteIntVolatile(this.memoryAddress+REF_COUNT_OFFSET);
++        if ((rawBits&MAGIC_MASK) != MAGIC_NUMBER) {
++          throw new IllegalStateException("It looks like this off heap memory was already freed. rawBits=" + Integer.toHexString(rawBits));
++        }
++        int uc = rawBits & REF_COUNT_MASK;
++        if (uc != 0) {
++          throw new IllegalStateException("Expected use count to be zero but it was: " + uc + " rawBits=0x" + Integer.toHexString(rawBits));
++        }
++      } while (!UnsafeMemoryChunk.writeAbsoluteIntVolatile(this.memoryAddress+REF_COUNT_OFFSET, rawBits, rawBits+1));
++    }
++
++    public static int getRefCount(long memAddr) {
++      return UnsafeMemoryChunk.readAbsoluteInt(memAddr+REF_COUNT_OFFSET) & REF_COUNT_MASK;
++    }
++
++    public static boolean retain(long memAddr) {
++      SimpleMemoryAllocatorImpl.validateAddress(memAddr);
++      int uc;
++      int rawBits;
++      int retryCount = 0;
++      do {
++        rawBits = UnsafeMemoryChunk.readAbsoluteIntVolatile(memAddr+REF_COUNT_OFFSET);
++        if ((rawBits&MAGIC_MASK) != MAGIC_NUMBER) {
++          // same as uc == 0
++          // TODO MAGIC_NUMBER rethink its use and interaction with compactor fragments
++          return false;
++        }
++        uc = rawBits & REF_COUNT_MASK;
++        if (uc == MAX_REF_COUNT) {
++          throw new IllegalStateException("Maximum use count exceeded. rawBits=" + Integer.toHexString(rawBits));
++        } else if (uc == 0) {
++          return false;
++        }
++        retryCount++;
++        if (retryCount > 1000) {
++          throw new IllegalStateException("tried to write " + (rawBits+1) + " to @" + Long.toHexString(memAddr) + " 1,000 times.");
++        }
++      } while (!UnsafeMemoryChunk.writeAbsoluteIntVolatile(memAddr+REF_COUNT_OFFSET, rawBits, rawBits+1));
++      //debugLog("use inced ref count " + (uc+1) + " @" + Long.toHexString(memAddr), true);
++      if (ReferenceCountHelper.trackReferenceCounts()) {
++        ReferenceCountHelper.refCountChanged(memAddr, false, uc+1);
++      }
++
++      return true;
++    }
++    public static void release(final long memAddr) {
++      release(memAddr, null);
++    }
++    static void release(final long memAddr, FreeListManager freeListManager) {
++      SimpleMemoryAllocatorImpl.validateAddress(memAddr);
++      int newCount;
++      int rawBits;
++      boolean returnToAllocator;
++      do {
++        returnToAllocator = false;
++        rawBits = UnsafeMemoryChunk.readAbsoluteIntVolatile(memAddr+REF_COUNT_OFFSET);
++        if ((rawBits&MAGIC_MASK) != MAGIC_NUMBER) {
++          String msg = "It looks like off heap memory @" + Long.toHexString(memAddr) + " was already freed. rawBits=" + Integer.toHexString(rawBits) + " history=" + ReferenceCountHelper.getFreeRefCountInfo(memAddr);
++          //debugLog(msg, true);
++          throw new IllegalStateException(msg);
++        }
++        int curCount = rawBits&REF_COUNT_MASK;
++        if ((curCount) == 0) {
++          //debugLog("too many frees @" + Long.toHexString(memAddr), true);
++          throw new IllegalStateException("Memory has already been freed." + " history=" + ReferenceCountHelper.getFreeRefCountInfo(memAddr) /*+ System.identityHashCode(this)*/);
++        }
++        if (curCount == 1) {
++          newCount = 0; // clear the use count, bits, and the delta size since it will be freed.
++          returnToAllocator = true;
++        } else {
++          newCount = rawBits-1;
++        }
++      } while (!UnsafeMemoryChunk.writeAbsoluteIntVolatile(memAddr+REF_COUNT_OFFSET, rawBits, newCount));
++      //debugLog("free deced ref count " + (newCount&USE_COUNT_MASK) + " @" + Long.toHexString(memAddr), true);
++      if (returnToAllocator ) {
++       if (ReferenceCountHelper.trackReferenceCounts()) {
++          if (ReferenceCountHelper.trackFreedReferenceCounts()) {
++            ReferenceCountHelper.refCountChanged(memAddr, true, newCount&REF_COUNT_MASK);
++          }
++          ReferenceCountHelper.freeRefCountInfo(memAddr);
++        }
++        if (freeListManager == null) {
++          freeListManager = SimpleMemoryAllocatorImpl.getAllocator().getFreeListManager();
++        }
++        freeListManager.free(memAddr);
++      } else {
++        if (ReferenceCountHelper.trackReferenceCounts()) {
++          ReferenceCountHelper.refCountChanged(memAddr, true, newCount&REF_COUNT_MASK);
++        }
++      }
++    }
++    
++    @Override
++    public String toString() {
++      return toStringForOffHeapByteSource();
++      // This old impl is not safe because it calls getDeserializedForReading and we have code that call toString that does not inc the refcount.
++      // Also if this Chunk is compressed we don't know how to decompress it.
++      //return super.toString() + ":<dataSize=" + getDataSize() + " refCount=" + getRefCount() + " addr=" + getMemoryAddress() + " storedObject=" + getDeserializedForReading() + ">";
++    }
++    
++    protected String toStringForOffHeapByteSource() {
++      return super.toString() + ":<dataSize=" + getDataSize() + " refCount=" + getRefCount() + " addr=" + Long.toHexString(getMemoryAddress()) + ">";
++    }
++    
++    @Override
++    public State getState() {
++      if (getRefCount() > 0) {
++        return State.ALLOCATED;
++      } else {
++        return State.DEALLOCATED;
++      }
++    }
++    @Override
++    public MemoryBlock getNextBlock() {
++      throw new UnsupportedOperationException();
++    }
++    @Override
++    public int getBlockSize() {
++      return getSize();
++    }
++    @Override
++    public int getSlabId() {
++      throw new UnsupportedOperationException();
++    }
++    @Override
++    public int getFreeListId() {
++      return -1;
++    }
++    @Override
++    public String getDataType() {
++      return null;
++    }
++    @Override
++    public Object getDataValue() {
++      return null;
++    }
++    public ObjectChunk slice(int position, int limit) {
++      return new ObjectChunkSlice(this, position, limit);
++    }
++  }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/ObjectChunkSlice.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/ObjectChunkSlice.java
index 0000000,0000000..3d6bf57
new file mode 100644
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/ObjectChunkSlice.java
@@@ -1,0 -1,0 +1,44 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *      http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package com.gemstone.gemfire.internal.offheap;
++
++/**
++ * Represents a slice of an ObjectChunk.
++ * A slice is a subsequence of the bytes stored in an ObjectChunk.
++ */
++public class ObjectChunkSlice extends ObjectChunk {
++  private final int offset;
++  private final int capacity;
++  public ObjectChunkSlice(ObjectChunk objectChunk, int position, int limit) {
++    super(objectChunk);
++    this.offset = objectChunk.getBaseDataOffset() + position;
++    this.capacity = limit - position;
++  }
++  @Override
++  public int getDataSize() {
++    return this.capacity;
++  }
++  
++  @Override
++  protected long getBaseDataAddress() {
++    return super.getBaseDataAddress() + this.offset;
++  }
++  @Override
++  protected int getBaseDataOffset() {
++    return this.offset;
++  }
++}