You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/22 22:44:03 UTC

[075/100] [abbrv] incubator-geode git commit: GEODE-917: Merge branch 'feature/GEODE-917' into develop

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/ObjectChunkWithHeapForm.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/ObjectChunkWithHeapForm.java
index 0000000,0000000..5020c7a
new file mode 100644
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/ObjectChunkWithHeapForm.java
@@@ -1,0 -1,0 +1,40 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *      http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package com.gemstone.gemfire.internal.offheap;
++
++/**
++ * Used to keep the heapForm around while an operation is still in progress.
++ * This allows the operation to access the serialized heap form instead of copying
++ * it from offheap. See bug 48135.
++ */
++public class ObjectChunkWithHeapForm extends ObjectChunk {
++  private final byte[] heapForm;
++  
++  public ObjectChunkWithHeapForm(ObjectChunk chunk, byte[] heapForm) {
++    super(chunk);
++    this.heapForm = heapForm;
++  }
++
++  @Override
++  protected byte[] getRawBytes() {
++    return this.heapForm;
++  }
++  
++  public ObjectChunk getChunkWithoutHeapForm() {
++    return new ObjectChunk(this);
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/OffHeapCachedDeserializable.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/OffHeapCachedDeserializable.java
index 0000000,1ec722d..bd380e2
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/OffHeapCachedDeserializable.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/OffHeapCachedDeserializable.java
@@@ -1,0 -1,142 +1,142 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import com.gemstone.gemfire.cache.Region;
+ import com.gemstone.gemfire.internal.DSCODE;
+ import com.gemstone.gemfire.internal.cache.BytesAndBitsForCompactor;
+ import com.gemstone.gemfire.internal.cache.EntryBits;
+ import com.gemstone.gemfire.internal.cache.RegionEntry;
+ import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+ 
+ /**
+  * This abstract class is intended to be used by {@link MemoryChunk} implementations that also want
+  * to be a CachedDeserializable.
+  * 
+  * @author darrel
+  * @since 9.0
+  */
+ public abstract class OffHeapCachedDeserializable extends AbstractStoredObject implements MemoryChunkWithRefCount {
+   public abstract void setSerializedValue(byte[] value);
+   @Override
+   public abstract byte[] getSerializedValue();
+   @Override
+   public abstract int getSizeInBytes();
+   @Override
+   public abstract int getValueSizeInBytes();
+   @Override
+   public abstract Object getDeserializedValue(Region r, RegionEntry re);
+ 
+   @Override
+   public void fillSerializedValue(BytesAndBitsForCompactor wrapper, byte userBits) {
+     if (isSerialized()) {
+       userBits = EntryBits.setSerialized(userBits, true);
+     }
 -    wrapper.setChunkData((Chunk) this, userBits);
++    wrapper.setChunkData((ObjectChunk) this, userBits);
+   }
+   
+   String getShortClassName() {
+     String cname = getClass().getName();
+     return cname.substring(getClass().getPackage().getName().length()+1);
+   }
+ 
+   @Override
+   public String toString() {
+     return getShortClassName()+"@"+this.hashCode();
+   }
+   public boolean checkDataEquals(@Unretained OffHeapCachedDeserializable other) {
+     if (this == other) {
+       return true;
+     }
+     if (isSerialized() != other.isSerialized()) {
+       return false;
+     }
+     int mySize = getValueSizeInBytes();
+     if (mySize != other.getValueSizeInBytes()) {
+       return false;
+     }
+     // We want to be able to do this operation without copying any of the data into the heap.
+     // Hopefully the jvm is smart enough to use our stack for this short lived array.
+     final byte[] dataCache1 = new byte[1024];
+     final byte[] dataCache2 = new byte[dataCache1.length];
+     // TODO OFFHEAP: no need to copy to heap. Just get the address of each and compare each byte.
+     int i;
+     // inc it twice since we are reading two different off-heap objects
+     SimpleMemoryAllocatorImpl.getAllocator().getStats().incReads();
+     SimpleMemoryAllocatorImpl.getAllocator().getStats().incReads();
+     for (i=0; i < mySize-(dataCache1.length-1); i+=dataCache1.length) {
+       this.readBytes(i, dataCache1);
+       other.readBytes(i, dataCache2);
+       for (int j=0; j < dataCache1.length; j++) {
+         if (dataCache1[j] != dataCache2[j]) {
+           return false;
+         }
+       }
+     }
+     int bytesToRead = mySize-i;
+     if (bytesToRead > 0) {
+       // need to do one more read which will be less than dataCache.length
+       this.readBytes(i, dataCache1, 0, bytesToRead);
+       other.readBytes(i, dataCache2, 0, bytesToRead);
+       for (int j=0; j < bytesToRead; j++) {
+         if (dataCache1[j] != dataCache2[j]) {
+           return false;
+         }
+       }
+     }
+     return true;
+   }
+   
+   public boolean isSerializedPdxInstance() {
+     byte dsCode = this.readByte(0);
+     return dsCode == DSCODE.PDX || dsCode == DSCODE.PDX_ENUM || dsCode == DSCODE.PDX_INLINE_ENUM;
+   }
+   
+   public boolean checkDataEquals(byte[] serializedObj) {
+     // caller was responsible for checking isSerialized
+     int mySize = getValueSizeInBytes();
+     if (mySize != serializedObj.length) {
+       return false;
+     }
+     // We want to be able to do this operation without copying any of the data into the heap.
+     // Hopefully the jvm is smart enough to use our stack for this short lived array.
+     // TODO OFFHEAP: compare as ByteBuffers?
+     final byte[] dataCache = new byte[1024];
+     int idx=0;
+     int i;
+     SimpleMemoryAllocatorImpl.getAllocator().getStats().incReads();
+     for (i=0; i < mySize-(dataCache.length-1); i+=dataCache.length) {
+       this.readBytes(i, dataCache);
+       for (int j=0; j < dataCache.length; j++) {
+         if (dataCache[j] != serializedObj[idx++]) {
+           return false;
+         }
+       }
+     }
+     int bytesToRead = mySize-i;
+     if (bytesToRead > 0) {
+       // need to do one more read which will be less than dataCache.length
+       this.readBytes(i, dataCache, 0, bytesToRead);
+       for (int j=0; j < bytesToRead; j++) {
+         if (dataCache[j] != serializedObj[idx++]) {
+           return false;
+         }
+       }
+     }
+     return true;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/OffHeapRegionEntryHelper.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/OffHeapRegionEntryHelper.java
index 0000000,84e4218..b62d97a
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/OffHeapRegionEntryHelper.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/OffHeapRegionEntryHelper.java
@@@ -1,0 -1,418 +1,406 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import com.gemstone.gemfire.internal.DSCODE;
+ import com.gemstone.gemfire.internal.cache.CachedDeserializableFactory;
+ import com.gemstone.gemfire.internal.cache.DiskEntry;
+ import com.gemstone.gemfire.internal.cache.DiskId;
+ import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+ import com.gemstone.gemfire.internal.cache.OffHeapRegionEntry;
+ import com.gemstone.gemfire.internal.cache.RegionEntryContext;
+ import com.gemstone.gemfire.internal.cache.Token;
+ import com.gemstone.gemfire.internal.offheap.annotations.Released;
+ import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+ import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+ 
+ /**
+  * The class just has static methods
+  * that operate on instances of {@link OffHeapRegionEntry}.
+  * It allows common code to be shared for all the
+  * classes we have that implement {@link OffHeapRegionEntry}.
+  * 
+  * @author darrel
+  * @since 9.0
+  */
+ public class OffHeapRegionEntryHelper {
+ 
+   protected static final long NULL_ADDRESS = 0L<<1;
+   protected static final long INVALID_ADDRESS = 1L<<1;
+   protected static final long LOCAL_INVALID_ADDRESS = 2L<<1;
+   protected static final long DESTROYED_ADDRESS = 3L<<1;
+   protected static final long REMOVED_PHASE1_ADDRESS = 4L<<1;
+   protected static final long REMOVED_PHASE2_ADDRESS = 5L<<1;
+   protected static final long END_OF_STREAM_ADDRESS = 6L<<1;
+   protected static final long NOT_AVAILABLE_ADDRESS = 7L<<1;
+   protected static final long TOMBSTONE_ADDRESS = 8L<<1;
+   public static final int MAX_LENGTH_FOR_DATA_AS_ADDRESS = 8;
 - /* private static final ChunkFactory chunkFactory ;
 -  static {
 -    ChunkFactory factory;
 -    try {
 -       factory= SimpleMemoryAllocatorImpl.getAllocator().getChunkFactory();
 -         
 -    }catch(CacheClosedException ce) {
 -      factory = null;
 -    }
 -    chunkFactory = factory;
 -  }*/
+   
+   private static final Token[] addrToObj = new Token[]{
+     null,
+     Token.INVALID,
+     Token.LOCAL_INVALID,
+     Token.DESTROYED,
+     Token.REMOVED_PHASE1,
+     Token.REMOVED_PHASE2,
+     Token.END_OF_STREAM,
+     Token.NOT_AVAILABLE,
+     Token.TOMBSTONE,
+   };
+   
+   private static long objectToAddress(@Unretained Object v) {
 -    if (v instanceof Chunk) return ((Chunk) v).getMemoryAddress();
++    if (v instanceof ObjectChunk) return ((ObjectChunk) v).getMemoryAddress();
+     if (v instanceof DataAsAddress) return ((DataAsAddress) v).getEncodedAddress();
+     if (v == null) return NULL_ADDRESS;
+     if (v == Token.TOMBSTONE) return TOMBSTONE_ADDRESS;
+     if (v == Token.INVALID) return INVALID_ADDRESS;
+     if (v == Token.LOCAL_INVALID) return LOCAL_INVALID_ADDRESS;
+     if (v == Token.DESTROYED) return DESTROYED_ADDRESS;
+     if (v == Token.REMOVED_PHASE1) return REMOVED_PHASE1_ADDRESS;
+     if (v == Token.REMOVED_PHASE2) return REMOVED_PHASE2_ADDRESS;
+     if (v == Token.END_OF_STREAM) return END_OF_STREAM_ADDRESS;
+     if (v == Token.NOT_AVAILABLE) return NOT_AVAILABLE_ADDRESS;
+     throw new IllegalStateException("Can not convert " + v + " to an off heap address.");
+   }
+   
+   /**
+    * This method may release the object stored at ohAddress if the result
+    * needs to be decompressed and the decompress parameter is true.
+    * This decompressed result will be on the heap.
+    * 
+    * @param ohAddress OFF_HEAP_ADDRESS
+    * @param decompress true if off-heap value should be decompressed before returning
+    * @param context used for decompression
+    * @return OFF_HEAP_OBJECT (sometimes)
+    */
+   @Unretained @Retained
+   public static Object addressToObject(@Released @Retained long ohAddress, boolean decompress, RegionEntryContext context) {
+     if (isOffHeap(ohAddress)) {
 -      //Chunk chunk = chunkFactory.newChunk(ohAddress);
 -      @Unretained Chunk chunk =  SimpleMemoryAllocatorImpl.getAllocator().getChunkFactory().newChunk(ohAddress);
++      @Unretained ObjectChunk chunk =  new ObjectChunk(ohAddress);
+       @Unretained Object result = chunk;
+       if (decompress && chunk.isCompressed()) {
+         try {
+           // to fix bug 47982 need to:
+           byte[] decompressedBytes = chunk.getDecompressedBytes(context);
+           if (chunk.isSerialized()) {
+             // return a VMCachedDeserializable with the decompressed serialized bytes since chunk is serialized
+             result = CachedDeserializableFactory.create(decompressedBytes);
+           } else {
+             // return a byte[] since chunk is not serialized
+             result = decompressedBytes;
+           }
+         } finally {
+           // decompress is only true when this method is called by _getValueRetain.
+           // In that case the caller has already retained ohAddress because it thought
+           // we would return it. But we have unwrapped it and are returning the decompressed results.
+           // So we need to release the chunk here.
+             chunk.release();
+         }
+       }
+       return result;
+     } else if ((ohAddress & ENCODED_BIT) != 0) {
+       DataAsAddress daa = new DataAsAddress(ohAddress);
+       Object result = daa;
+       if (decompress && daa.isCompressed()) {
+         byte[] decompressedBytes = daa.getDecompressedBytes(context);
+         if (daa.isSerialized()) {
+           // return a VMCachedDeserializable with the decompressed serialized bytes since daa is serialized
+           result = CachedDeserializableFactory.create(decompressedBytes);
+         } else {
+           // return a byte[] since daa is not serialized
+           result = decompressedBytes;
+         }
+       }
+       return result;
+     } else {
+       return addrToObj[(int) ohAddress>>1];
+     }
+   }
+   
+   public static int getSerializedLengthFromDataAsAddress(DataAsAddress dataAsAddress) {
+     final long ohAddress = dataAsAddress.getEncodedAddress();
+     
+      if ((ohAddress & ENCODED_BIT) != 0) {     
+       boolean isLong = (ohAddress & LONG_BIT) != 0;     
+       if (isLong) {
+        return 9;
+       } else {
+         return (int) ((ohAddress & SIZE_MASK) >> SIZE_SHIFT);        
+       }     
+     } else {
+       return 0;
+     }
+   }
+   
+  /*
+   * This method is optimized for cases where if the caller wants to convert address to a Token
+   * compared to addressToObject which would deserialize the value.
+   */
+   private static Token addressToToken(long ohAddress) {
+     if (isOffHeap(ohAddress) || (ohAddress & ENCODED_BIT) != 0) {
+       return Token.NOT_A_TOKEN;
+     } else {
+       return addrToObj[(int) ohAddress>>1];
+     }
+   }
+ 
+   private static void releaseAddress(@Released long ohAddress) {
+     if (isOffHeap(ohAddress)) {
 -      Chunk.release(ohAddress, true);
++      ObjectChunk.release(ohAddress);
+     }
+   }
+   
+   /**
+    * The address in 're' will be @Released.
+    */
+   public static void releaseEntry(@Released OffHeapRegionEntry re) {
+     if (re instanceof DiskEntry) {
+       DiskId did = ((DiskEntry) re).getDiskId();
+       if (did != null && did.isPendingAsync()) {
+         synchronized (did) {
+           // This may not be needed so remove this call if it causes problems.
+           // We no longer need this entry to be written to disk so unschedule it
+           // before we change its value to REMOVED_PHASE2.
+           did.setPendingAsync(false);
+           setValue(re, Token.REMOVED_PHASE2);
+           return;
+         }
+       }
+     }
+     setValue(re, Token.REMOVED_PHASE2);
+   }
+ 
+   public static void releaseEntry(@Unretained OffHeapRegionEntry re, @Released MemoryChunkWithRefCount expectedValue) {
+     long oldAddress = objectToAddress(expectedValue);
+     final long newAddress = objectToAddress(Token.REMOVED_PHASE2);
+     if (re.setAddress(oldAddress, newAddress) || re.getAddress() != newAddress) {
+       releaseAddress(oldAddress);
+     } /*else {
+       if (!calledSetValue || re.getAddress() != newAddress) {
+         expectedValue.release();
+       }
+     }*/
+   }
+   
+   /**
+    * This bit is set to indicate that this address has data encoded in it.
+    */
+   private static long ENCODED_BIT = 1L;
+   /**
+    * This bit is set to indicate that the encoded data is serialized.
+    */
+   static long SERIALIZED_BIT = 2L;
+   /**
+    * This bit is set to indicate that the encoded data is compressed.
+    */
+   static long COMPRESSED_BIT = 4L;
+   /**
+    * This bit is set to indicate that the encoded data is a long whose value fits in 7 bytes.
+    */
+   private static long LONG_BIT = 8L;
+   /**
+    * size is in the range 0..7 so we only need 3 bits.
+    */
+   private static long SIZE_MASK = 0x70L;
+   /**
+    * number of bits to shift the size by.
+    */
+   private static int SIZE_SHIFT = 4;
+   // the msb of this byte is currently unused
+   
+   /**
+    * Returns 0 if the data could not be encoded as an address.
+    */
+   public static long encodeDataAsAddress(byte[] v, boolean isSerialized, boolean isCompressed) {
+     if (v.length < MAX_LENGTH_FOR_DATA_AS_ADDRESS) {
+       long result = 0L;
+       for (int i=0; i < v.length; i++) {
+         result |= v[i] & 0x00ff;
+         result <<= 8;
+       }
+       result |= (v.length << SIZE_SHIFT) | ENCODED_BIT;
+       if (isSerialized) {
+         result |= SERIALIZED_BIT;
+       }
+       if (isCompressed) {
+         result |= COMPRESSED_BIT;
+       }
+       return result;
+     } else if (isSerialized && !isCompressed) {
+       // Check for some special types that take more than 7 bytes to serialize
+       // but that might be able to be inlined with less than 8 bytes.
+       if (v[0] == DSCODE.LONG) {
+         // A long is currently always serialized as 8 bytes (9 if you include the dscode).
+         // But many long values will actually be small enough for is to encode in 7 bytes.
+         if ((v[1] == 0 && (v[2] & 0x80) == 0) || (v[1] == -1 && (v[2] & 0x80) != 0)) {
+           // The long can be encoded as 7 bytes since the most signification byte
+           // is simply an extension of the sign byte on the second most signification byte.
+           long result = 0L;
+           for (int i=2; i < v.length; i++) {
+             result |= v[i] & 0x00ff;
+             result <<= 8;
+           }
+           result |= (7 << SIZE_SHIFT) | LONG_BIT | SERIALIZED_BIT | ENCODED_BIT;
+           return result;
+         }
+       }
+     }
+     return 0L;
+   }
+ 
+   static Object decodeAddressToObject(long ohAddress) {
+       byte[] bytes = decodeAddressToBytes(ohAddress, true, false);
+ 
+       boolean isSerialized = (ohAddress & SERIALIZED_BIT) != 0;
+       if (isSerialized) {
+          return EntryEventImpl.deserialize(bytes);
+       } else {
+           return bytes;
+       }
+   }
+ 
+   static byte[] decodeAddressToBytes(long addr, boolean decompress, boolean compressedOk) {
+     assert (addr & ENCODED_BIT) != 0;
+     boolean isCompressed = (addr & COMPRESSED_BIT) != 0;
+     int size = (int) ((addr & SIZE_MASK) >> SIZE_SHIFT);
+     boolean isLong = (addr & LONG_BIT) != 0;
+     byte[] bytes;
+     if (isLong) {
+       bytes = new byte[9];
+       bytes[0] = DSCODE.LONG;
+       for (int i=8; i >=2; i--) {
+         addr >>= 8;
+         bytes[i] = (byte) (addr & 0x00ff);
+       }
+       if ((bytes[2] & 0x80) != 0) {
+         bytes[1] = -1;
+       } else {
+         bytes[1] = 0;
+       }
+     } else {
+       bytes = new byte[size];
+       for (int i=size-1; i >=0; i--) {
+         addr >>= 8;
+         bytes[i] = (byte) (addr & 0x00ff);
+       }
+     }
+     if (decompress && isCompressed) {
+       if (!compressedOk) {
+         throw new UnsupportedOperationException("Did not expect DataAsAddress to be compressed");
+       }
+     }
+     return bytes;
+   }
+ 
+   /**
+    * The previous value at the address in 're' will be @Released and then the
+    * address in 're' will be set to the @Unretained address of 'v'.
+    */
+   public static void setValue(@Released OffHeapRegionEntry re, @Unretained Object v) {
+     // setValue is called when synced so I don't need to worry
+     // about oldAddress being released by someone else.
+     final long newAddress = objectToAddress(v);
+     long oldAddress;
+     do {
+       oldAddress = re.getAddress();
+     } while (!re.setAddress(oldAddress, newAddress));
+     ReferenceCountHelper.setReferenceCountOwner(re);
+     releaseAddress(oldAddress);
+     ReferenceCountHelper.setReferenceCountOwner(null);
+   }
+  
+   public static Token getValueAsToken(@Unretained OffHeapRegionEntry re) {
+     return addressToToken(re.getAddress());
+   }
+ 
+   @Unretained
+   public static Object _getValue(@Unretained OffHeapRegionEntry re) {
+     return addressToObject(re.getAddress(), false, null); // no context needed so decompress is false
+   }
+   
+   public static boolean isOffHeap(long addr) {
+     if ((addr & ENCODED_BIT) != 0) return false;
+     if (addr < 0) return true;
+     addr >>= 1; // shift right 1 to convert to array index;
+     return addr >= addrToObj.length;
+   }
+ 
+   /**
+    * If the value stored at the location held in 're' is returned, then it will
+    * be Retained.  If the value returned is 're' decompressed into another
+    * off-heap location, then 're' will be Unretained but the new,
+    * decompressed value will be Retained.  Therefore, whichever is returned
+    * (the value at the address in 're' or the decompressed value) it will have
+    * been Retained.
+    * 
+    * @return possible OFF_HEAP_OBJECT (caller must release)
+    */
+   @Retained
+   public static Object _getValueRetain(@Retained @Unretained OffHeapRegionEntry re, boolean decompress, RegionEntryContext context) {
+     int retryCount = 0;
+     @Retained long addr = re.getAddress();
+     while (isOffHeap(addr)) {
 -      if (Chunk.retain(addr)) {
++      if (ObjectChunk.retain(addr)) {
+         @Unretained long addr2 = re.getAddress();
+         if (addr != addr2) {
+           retryCount = 0;
 -          Chunk.release(addr, true);
++          ObjectChunk.release(addr);
+           // spin around and try again.
+           addr = addr2;
+         } else {
+           return addressToObject(addr, decompress, context);
+         }
+       } else {
+         // spin around and try again
+         long addr2 = re.getAddress();
+         retryCount++;
+         if (retryCount > 100) {
+           throw new IllegalStateException("retain failed addr=" + addr + " addr2=" + addr + " 100 times" + " history=" + ReferenceCountHelper.getFreeRefCountInfo(addr));
+         }
+         addr = addr2;
+         // Since retain returned false our region entry should have a different
+         // value in it. However the actual address could be the exact same one
+         // because addr was released, then reallocated from the free list and set
+         // back into this region entry. See bug 47782
+       }
+     }
+     return addressToObject(addr, decompress, context);
+   }
+   
+  
+ 
+   public static boolean isSerialized(long address) {
+     return (address & SERIALIZED_BIT) != 0;
+   }
+ 
+   public static boolean isCompressed(long address) {
+     return (address & COMPRESSED_BIT) != 0;
+   }
+   
+   private static final ThreadLocal<Object> clearNeedsToCheckForOffHeap = new ThreadLocal<Object>();
+   public static boolean doesClearNeedToCheckForOffHeap() {
+     return clearNeedsToCheckForOffHeap.get() != null;
+   }
+   public static void doWithOffHeapClear(Runnable r) {
+     clearNeedsToCheckForOffHeap.set(Boolean.TRUE);
+     try {
+       r.run();
+     } finally {
+       clearNeedsToCheckForOffHeap.remove();
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
index 0000000,12d297b..14bde59
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
@@@ -1,0 -1,625 +1,511 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ 
+ import org.apache.logging.log4j.Logger;
+ 
+ import com.gemstone.gemfire.LogWriter;
+ import com.gemstone.gemfire.cache.CacheClosedException;
+ import com.gemstone.gemfire.cache.Region;
+ import com.gemstone.gemfire.cache.RegionService;
+ import com.gemstone.gemfire.internal.cache.BucketRegion;
+ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+ import com.gemstone.gemfire.internal.cache.LocalRegion;
+ import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+ import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
+ import com.gemstone.gemfire.internal.cache.RegionEntry;
+ import com.gemstone.gemfire.internal.logging.LogService;
+ import com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier;
+ import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+ 
+ /**
+  * This allocator is somewhat like an Arena allocator.
+  * We start out with an array of multiple large chunks of memory.
+  * We also keep lists of any chunk that have been allocated and freed.
+  * An allocation will always try to find a chunk in a free list that is a close fit to the requested size.
+  * If no close fits exist then it allocates the next slice from the front of one the original large chunks.
+  * If we can not find enough free memory then all the existing free memory is compacted.
+  * If we still do not have enough to make the allocation an exception is thrown.
+  * 
+  * @author darrel
+  * @author Kirk Lund
+  * @since 9.0
+  */
 -public final class SimpleMemoryAllocatorImpl implements MemoryAllocator {
++public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
+ 
+   static final Logger logger = LogService.getLogger();
+   
+   public static final String FREE_OFF_HEAP_MEMORY_PROPERTY = "gemfire.free-off-heap-memory";
+   
 -  /**
 -   * How many extra allocations to do for each actual slab allocation.
 -   * Is this really a good idea?
 -   */
 -  public static final int BATCH_SIZE = Integer.getInteger("gemfire.OFF_HEAP_BATCH_ALLOCATION_SIZE", 1);
 -  /**
 -   * Every allocated chunk smaller than TINY_MULTIPLE*TINY_FREE_LIST_COUNT will allocate a chunk of memory that is a multiple of this value.
 -   * Sizes are always rounded up to the next multiple of this constant
 -   * so internal fragmentation will be limited to TINY_MULTIPLE-1 bytes per allocation
 -   * and on average will be TINY_MULTIPLE/2 given a random distribution of size requests.
 -   * This does not account for the additional internal fragmentation caused by the off-heap header
 -   * which currently is always 8 bytes.
 -   */
 -  public final static int TINY_MULTIPLE = Integer.getInteger("gemfire.OFF_HEAP_ALIGNMENT", 8);
 -  /**
 -   * Number of free lists to keep for tiny allocations.
 -   */
 -  public final static int TINY_FREE_LIST_COUNT = Integer.getInteger("gemfire.OFF_HEAP_FREE_LIST_COUNT", 16384);
 -  public final static int MAX_TINY = TINY_MULTIPLE*TINY_FREE_LIST_COUNT;
 -  /**
 -   * How many unused bytes are allowed in a huge memory allocation.
 -   */
 -  public final static int HUGE_MULTIPLE = 256;
 -  
 -  volatile OffHeapMemoryStats stats;
++  private volatile OffHeapMemoryStats stats;
+   
 -  volatile OutOfOffHeapMemoryListener ooohml;
 -  
 -  /** The MemoryChunks that this allocator is managing by allocating smaller chunks of them.
 -   * The contents of this array never change.
 -   */
 -  private final UnsafeMemoryChunk[] slabs;
 -  private final long totalSlabSize;
 -  private final int largestSlab;
++  private volatile OutOfOffHeapMemoryListener ooohml;
+   
++  OutOfOffHeapMemoryListener getOutOfOffHeapMemoryListener() {
++    return this.ooohml;
++  }
++
+   public final FreeListManager freeList;
+ 
+   private MemoryInspector memoryInspector;
+ 
+   private volatile MemoryUsageListener[] memoryUsageListeners = new MemoryUsageListener[0];
+   
+   private static SimpleMemoryAllocatorImpl singleton = null;
 -  final ChunkFactory chunkFactory;
+   
+   public static SimpleMemoryAllocatorImpl getAllocator() {
+     SimpleMemoryAllocatorImpl result = singleton;
+     if (result == null) {
+       throw new CacheClosedException("Off Heap memory allocator does not exist.");
+     }
+     return result;
+   }
+ 
+   private static final boolean DO_EXPENSIVE_VALIDATION = Boolean.getBoolean("gemfire.OFF_HEAP_DO_EXPENSIVE_VALIDATION");
+   
+   public static MemoryAllocator create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats, LogWriter lw, 
+       int slabCount, long offHeapMemorySize, long maxSlabSize) {
+     return create(ooohml, stats, lw, slabCount, offHeapMemorySize, maxSlabSize,
 -        null, TINY_MULTIPLE, BATCH_SIZE, TINY_FREE_LIST_COUNT, HUGE_MULTIPLE, 
 -        new UnsafeMemoryChunk.Factory() {
++        null, new AddressableMemoryChunkFactory() {
+       @Override
 -      public UnsafeMemoryChunk create(int size) {
++      public AddressableMemoryChunk create(int size) {
+         return new UnsafeMemoryChunk(size);
+       }
+     });
+   }
+ 
+   private static SimpleMemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats, LogWriter lw, 
+       int slabCount, long offHeapMemorySize, long maxSlabSize, 
 -      UnsafeMemoryChunk[] slabs, int tinyMultiple, int batchSize, int tinyFreeListCount, int hugeMultiple,
 -      UnsafeMemoryChunk.Factory memChunkFactory) {
++      AddressableMemoryChunk[] slabs, AddressableMemoryChunkFactory memChunkFactory) {
+     SimpleMemoryAllocatorImpl result = singleton;
+     boolean created = false;
+     try {
+     if (result != null) {
+       result.reuse(ooohml, lw, stats, offHeapMemorySize, slabs);
+       if (lw != null) {
 -        lw.config("Reusing " + result.getTotalMemory() + " bytes of off-heap memory. The maximum size of a single off-heap object is " + result.largestSlab + " bytes.");
++        lw.config("Reusing " + result.getTotalMemory() + " bytes of off-heap memory. The maximum size of a single off-heap object is " + result.freeList.getLargestSlabSize() + " bytes.");
+       }
+       created = true;
+       LifecycleListener.invokeAfterReuse(result);
+     } else {
+       if (slabs == null) {
+         // allocate memory chunks
+         //SimpleMemoryAllocatorImpl.cleanupPreviousAllocator();
+         if (lw != null) {
+           lw.config("Allocating " + offHeapMemorySize + " bytes of off-heap memory. The maximum size of a single off-heap object is " + maxSlabSize + " bytes.");
+         }
+         slabs = new UnsafeMemoryChunk[slabCount];
+         long uncreatedMemory = offHeapMemorySize;
+         for (int i=0; i < slabCount; i++) {
+           try {
+             if (uncreatedMemory >= maxSlabSize) {
+               slabs[i] = memChunkFactory.create((int) maxSlabSize);
+               uncreatedMemory -= maxSlabSize;
+             } else {
+               // the last slab can be smaller then maxSlabSize
+               slabs[i] = memChunkFactory.create((int) uncreatedMemory);
+             }
+           } catch (OutOfMemoryError err) {
+             if (i > 0) {
+               if (lw != null) {
+                 lw.severe("Off-heap memory creation failed after successfully allocating " + (i*maxSlabSize) + " bytes of off-heap memory.");
+               }
+             }
+             for (int j=0; j < i; j++) {
+               if (slabs[j] != null) {
+                 slabs[j].release();
+               }
+             }
+             throw err;
+           }
+         }
+       }
+ 
 -      result = new SimpleMemoryAllocatorImpl(ooohml, stats, slabs, tinyMultiple, batchSize, tinyFreeListCount, hugeMultiple);
++      result = new SimpleMemoryAllocatorImpl(ooohml, stats, slabs);
+       singleton = result;
+       LifecycleListener.invokeAfterCreate(result);
+       created = true;
+     }
+     } finally {
+       if (!created) {
+         if (stats != null) {
+           stats.close();
+         }
+         if (ooohml != null) {
+           ooohml.close();
+         }
+       }
+     }
+     return result;
+   }
 -  // for unit tests
 -  static SimpleMemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats, LogWriter lw, 
 -      int slabCount, long offHeapMemorySize, long maxSlabSize, UnsafeMemoryChunk.Factory memChunkFactory) {
++  static SimpleMemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats, LogWriter lw, 
++      int slabCount, long offHeapMemorySize, long maxSlabSize, AddressableMemoryChunkFactory memChunkFactory) {
+     return create(ooohml, stats, lw, slabCount, offHeapMemorySize, maxSlabSize, 
 -        null, TINY_MULTIPLE, BATCH_SIZE, TINY_FREE_LIST_COUNT, HUGE_MULTIPLE, memChunkFactory);
 -  }
 -  // for unit tests
 -  public static SimpleMemoryAllocatorImpl create(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats stats, UnsafeMemoryChunk[] slabs) {
 -    return create(oooml, stats, slabs, TINY_MULTIPLE, BATCH_SIZE, TINY_FREE_LIST_COUNT, HUGE_MULTIPLE);
++        null, memChunkFactory);
+   }
 -  // for unit tests
 -  static SimpleMemoryAllocatorImpl create(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats stats, UnsafeMemoryChunk[] slabs,
 -      int tinyMultiple, int batchSize, int tinyFreeListCount, int hugeMultiple) {
++  public static SimpleMemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats stats, AddressableMemoryChunk[] slabs) {
+     int slabCount = 0;
+     long offHeapMemorySize = 0;
+     long maxSlabSize = 0;
+     if (slabs != null) {
+       slabCount = slabs.length;
+       for (int i=0; i < slabCount; i++) {
+         int slabSize = slabs[i].getSize();
+         offHeapMemorySize += slabSize;
+         if (slabSize > maxSlabSize) {
+           maxSlabSize = slabSize;
+         }
+       }
+     }
 -    return create(oooml, stats, null, slabCount, offHeapMemorySize, maxSlabSize, slabs, tinyMultiple, batchSize, tinyFreeListCount, hugeMultiple, null);
++    return create(oooml, stats, null, slabCount, offHeapMemorySize, maxSlabSize, slabs, null);
+   }
+   
+   
 -  private void reuse(OutOfOffHeapMemoryListener oooml, LogWriter lw, OffHeapMemoryStats newStats, long offHeapMemorySize, UnsafeMemoryChunk[] slabs) {
++  private void reuse(OutOfOffHeapMemoryListener oooml, LogWriter lw, OffHeapMemoryStats newStats, long offHeapMemorySize, AddressableMemoryChunk[] slabs) {
+     if (isClosed()) {
+       throw new IllegalStateException("Can not reuse a closed off-heap memory manager.");
+     }
+     if (oooml == null) {
+       throw new IllegalArgumentException("OutOfOffHeapMemoryListener is null");
+     }
+     if (getTotalMemory() != offHeapMemorySize) {
+       if (lw != null) {
+         lw.warning("Using " + getTotalMemory() + " bytes of existing off-heap memory instead of the requested " + offHeapMemorySize);
+       }
+     }
 -    if (slabs != null) {
 -      // this will only happen in unit tests
 -      if (slabs != this.slabs) {
 -        // If the unit test gave us a different array
 -        // of slabs then something is wrong because we
 -        // are trying to reuse the old already allocated
 -        // array which means that the new one will never
 -        // be used. Note that this code does not bother
 -        // comparing the contents of the arrays.
 -        throw new IllegalStateException("attempted to reuse existing off-heap memory even though new off-heap memory was allocated");
 -      }
++    if (!this.freeList.okToReuse(slabs)) {
++      throw new IllegalStateException("attempted to reuse existing off-heap memory even though new off-heap memory was allocated");
+     }
+     this.ooohml = oooml;
+     newStats.initialize(this.stats);
+     this.stats = newStats;
+   }
+ 
 -  private SimpleMemoryAllocatorImpl(final OutOfOffHeapMemoryListener oooml, final OffHeapMemoryStats stats, final UnsafeMemoryChunk[] slabs,
 -      int tinyMultiple, int batchSize, int tinyFreeListCount, int hugeMultiple) {
++  private SimpleMemoryAllocatorImpl(final OutOfOffHeapMemoryListener oooml, final OffHeapMemoryStats stats, final AddressableMemoryChunk[] slabs) {
+     if (oooml == null) {
+       throw new IllegalArgumentException("OutOfOffHeapMemoryListener is null");
+     }
 -    if (tinyMultiple <= 0 || (tinyMultiple & 3) != 0) {
 -      throw new IllegalStateException("gemfire.OFF_HEAP_ALIGNMENT must be a multiple of 8.");
 -    }
 -    if (tinyMultiple > 256) {
 -      // this restriction exists because of the dataSize field in the object header.
 -      throw new IllegalStateException("gemfire.OFF_HEAP_ALIGNMENT must be <= 256 and a multiple of 8.");
 -    }
 -    if (batchSize <= 0) {
 -      throw new IllegalStateException("gemfire.OFF_HEAP_BATCH_ALLOCATION_SIZE must be >= 1.");
 -    }
 -    if (tinyFreeListCount <= 0) {
 -      throw new IllegalStateException("gemfire.OFF_HEAP_FREE_LIST_COUNT must be >= 1.");
 -    }
 -    if (hugeMultiple > 256 || hugeMultiple < 0) {
 -      // this restriction exists because of the dataSize field in the object header.
 -      throw new IllegalStateException("HUGE_MULTIPLE must be >= 0 and <= 256 but it was " + hugeMultiple);
 -    }
+     
+     this.ooohml = oooml;
+     this.stats = stats;
 -    this.slabs = slabs;
 -    this.chunkFactory = new GemFireChunkFactory();
 -    
++
+     //OSProcess.printStacks(0, InternalDistributedSystem.getAnyInstance().getLogWriter(), false);
+     this.stats.setFragments(slabs.length);
 -    largestSlab = slabs[0].getSize();
 -    this.stats.setLargestFragment(largestSlab);
 -    long total = 0;
 -    for (int i=0; i < slabs.length; i++) {
 -      //debugLog("slab"+i + " @" + Long.toHexString(slabs[i].getMemoryAddress()), false);
 -      //UnsafeMemoryChunk.clearAbsolute(slabs[i].getMemoryAddress(), slabs[i].getSize()); // HACK to see what this does to bug 47883
 -      total += slabs[i].getSize();
 -    }
 -    totalSlabSize = total;
 -    this.stats.incMaxMemory(this.totalSlabSize);
 -    this.stats.incFreeMemory(this.totalSlabSize);
++    this.stats.setLargestFragment(slabs[0].getSize());
+     
 -    this.freeList = new FreeListManager(this);
++    this.freeList = new FreeListManager(this, slabs);
+     this.memoryInspector = new MemoryInspectorImpl(this.freeList);
++
++    this.stats.incMaxMemory(this.freeList.getTotalMemory());
++    this.stats.incFreeMemory(this.freeList.getTotalMemory());
+   }
+   
 -  public List<Chunk> getLostChunks() {
 -    List<Chunk> liveChunks = this.freeList.getLiveChunks();
 -    List<Chunk> regionChunks = getRegionLiveChunks();
 -    Set<Chunk> liveChunksSet = new HashSet<>(liveChunks);
 -    Set<Chunk> regionChunksSet = new HashSet<>(regionChunks);
++  public List<ObjectChunk> getLostChunks() {
++    List<ObjectChunk> liveChunks = this.freeList.getLiveChunks();
++    List<ObjectChunk> regionChunks = getRegionLiveChunks();
++    Set<ObjectChunk> liveChunksSet = new HashSet<>(liveChunks);
++    Set<ObjectChunk> regionChunksSet = new HashSet<>(regionChunks);
+     liveChunksSet.removeAll(regionChunksSet);
 -    return new ArrayList<Chunk>(liveChunksSet);
++    return new ArrayList<ObjectChunk>(liveChunksSet);
+   }
+   
+   /**
+    * Returns a possibly empty list that contains all the Chunks used by regions.
+    */
 -  private List<Chunk> getRegionLiveChunks() {
 -    ArrayList<Chunk> result = new ArrayList<Chunk>();
++  private List<ObjectChunk> getRegionLiveChunks() {
++    ArrayList<ObjectChunk> result = new ArrayList<ObjectChunk>();
+     RegionService gfc = GemFireCacheImpl.getInstance();
+     if (gfc != null) {
+       Iterator<Region<?,?>> rootIt = gfc.rootRegions().iterator();
+       while (rootIt.hasNext()) {
+         Region<?,?> rr = rootIt.next();
+         getRegionLiveChunks(rr, result);
+         Iterator<Region<?,?>> srIt = rr.subregions(true).iterator();
+         while (srIt.hasNext()) {
+           getRegionLiveChunks(srIt.next(), result);
+         }
+       }
+     }
+     return result;
+   }
+ 
 -  private void getRegionLiveChunks(Region<?,?> r, List<Chunk> result) {
++  private void getRegionLiveChunks(Region<?,?> r, List<ObjectChunk> result) {
+     if (r.getAttributes().getOffHeap()) {
+ 
+       if (r instanceof PartitionedRegion) {
+         PartitionedRegionDataStore prs = ((PartitionedRegion) r).getDataStore();
+         if (prs != null) {
+           Set<BucketRegion> brs = prs.getAllLocalBucketRegions();
+           if (brs != null) {
+             for (BucketRegion br : brs) {
+               if (br != null && !br.isDestroyed()) {
+                 this.basicGetRegionLiveChunks(br, result);
+               }
+ 
+             }
+           }
+         }
+       } else {
+         this.basicGetRegionLiveChunks((LocalRegion) r, result);
+       }
+ 
+     }
+ 
+   }
+   
 -  private void basicGetRegionLiveChunks(LocalRegion r, List<Chunk> result) {
++  private void basicGetRegionLiveChunks(LocalRegion r, List<ObjectChunk> result) {
+     for (Object key : r.keySet()) {
+       RegionEntry re = ((LocalRegion) r).getRegionEntry(key);
+       if (re != null) {
+         /**
+          * value could be GATEWAY_SENDER_EVENT_IMPL_VALUE or region entry value.
+          */
+         @Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
+         Object value = re._getValue();
 -        if (value instanceof Chunk) {
 -          result.add((Chunk) value);
++        if (value instanceof ObjectChunk) {
++          result.add((ObjectChunk) value);
+         }
+       }
+     }
+   }
+ 
 -  @Override
 -  public MemoryChunk allocate(int size, ChunkType chunkType) {
 -    //System.out.println("allocating " + size);
 -    Chunk result = this.freeList.allocate(size, chunkType);
 -    //("allocated off heap object of size " + size + " @" + Long.toHexString(result.getMemoryAddress()), true);
++  private ObjectChunk allocateChunk(int size) {
++    ObjectChunk result = this.freeList.allocate(size);
++    int resultSize = result.getSize();
++    stats.incObjects(1);
++    stats.incUsedMemory(resultSize);
++    stats.incFreeMemory(-resultSize);
++    notifyListeners();
+     if (ReferenceCountHelper.trackReferenceCounts()) {
+       ReferenceCountHelper.refCountChanged(result.getMemoryAddress(), false, 1);
+     }
+     return result;
+   }
+   
++  @Override
++  public MemoryChunk allocate(int size) {
++    //System.out.println("allocating " + size);
++    ObjectChunk result = allocateChunk(size);
++    //("allocated off heap object of size " + size + " @" + Long.toHexString(result.getMemoryAddress()), true);
++    return result;
++  }
++  
+   public static void debugLog(String msg, boolean logStack) {
+     if (logStack) {
+       logger.info(msg, new RuntimeException(msg));
+     } else {
+       logger.info(msg);
+     }
+   }
+   
+   @Override
 -  public StoredObject allocateAndInitialize(byte[] v, boolean isSerialized, boolean isCompressed, ChunkType chunkType) {
++  public StoredObject allocateAndInitialize(byte[] v, boolean isSerialized, boolean isCompressed) {
+     long addr = OffHeapRegionEntryHelper.encodeDataAsAddress(v, isSerialized, isCompressed);
+     if (addr != 0L) {
+       return new DataAsAddress(addr);
+     }
 -    if (chunkType == null) {
 -      chunkType = GemFireChunk.TYPE;
 -    }
 -
 -    Chunk result = this.freeList.allocate(v.length, chunkType);
++    ObjectChunk result = allocateChunk(v.length);
+     //debugLog("allocated off heap object of size " + v.length + " @" + Long.toHexString(result.getMemoryAddress()), true);
+     //debugLog("allocated off heap object of size " + v.length + " @" + Long.toHexString(result.getMemoryAddress()) +  "chunkSize=" + result.getSize() + " isSerialized=" + isSerialized + " v=" + Arrays.toString(v), true);
 -    if (ReferenceCountHelper.trackReferenceCounts()) {
 -      ReferenceCountHelper.refCountChanged(result.getMemoryAddress(), false, 1);
 -    }
 -    assert result.getChunkType() == chunkType: "chunkType=" + chunkType + " getChunkType()=" + result.getChunkType();
+     result.setSerializedValue(v);
+     result.setSerialized(isSerialized);
+     result.setCompressed(isCompressed);
+     return result;
+   }
+   
+   @Override
+   public long getFreeMemory() {
+     return this.freeList.getFreeMemory();
+   }
+ 
+   @Override
+   public long getUsedMemory() {
+     return this.freeList.getUsedMemory();
+   }
+ 
+   @Override
+   public long getTotalMemory() {
 -    return totalSlabSize;
++    return this.freeList.getTotalMemory();
+   }
+   
+   @Override
+   public void close() {
+     try {
+       LifecycleListener.invokeBeforeClose(this);
+     } finally {
+       this.ooohml.close();
+       if (Boolean.getBoolean(FREE_OFF_HEAP_MEMORY_PROPERTY)) {
+         realClose();
+       }
+     }
+   }
+   
+   public static void freeOffHeapMemory() {
+     SimpleMemoryAllocatorImpl ma = singleton;
+     if (ma != null) {
+       ma.realClose();
+     }
+   }
+   
+   private void realClose() {
+     // Removing this memory immediately can lead to a SEGV. See 47885.
+     if (setClosed()) {
 -      freeSlabs(this.slabs);
++      this.freeList.freeSlabs();
+       this.stats.close();
+       singleton = null;
+     }
+   }
+   
+   private final AtomicBoolean closed = new AtomicBoolean();
+   private boolean isClosed() {
+     return this.closed.get();
+   }
+   /**
+    * Returns true if caller is the one who should close; false if some other thread
+    * is already closing.
+    */
+   private boolean setClosed() {
+     return this.closed.compareAndSet(false, true);
+   }
+   
+ 
 -  private static void freeSlabs(final UnsafeMemoryChunk[] slabs) {
 -    //debugLog("called freeSlabs", false);
 -    for (int i=0; i < slabs.length; i++) {
 -      slabs[i].release();
 -    }
 -  }
 -  
 -  void freeChunk(long addr) {
 -    this.freeList.free(addr);
 -  }
 -  
 -  protected UnsafeMemoryChunk[] getSlabs() {
 -    return this.slabs;
++  FreeListManager getFreeListManager() {
++    return this.freeList;
+   }
+   
+   /**
+    * Return the slabId of the slab that contains the given addr.
+    */
+   int findSlab(long addr) {
 -    for (int i=0; i < this.slabs.length; i++) {
 -      UnsafeMemoryChunk slab = this.slabs[i];
 -      long slabAddr = slab.getMemoryAddress();
 -      if (addr >= slabAddr) {
 -        if (addr < slabAddr + slab.getSize()) {
 -          return i;
 -        }
 -      }
 -    }
 -    throw new IllegalStateException("could not find a slab for addr " + addr);
++    return this.freeList.findSlab(addr);
+   }
+   
+   public OffHeapMemoryStats getStats() {
+     return this.stats;
+   }
+   
 -  public ChunkFactory getChunkFactory() {
 -    return this.chunkFactory;
 -  }
 -
+   @Override
+   public void addMemoryUsageListener(final MemoryUsageListener listener) {
+     synchronized (this.memoryUsageListeners) {
+       final MemoryUsageListener[] newMemoryUsageListeners = Arrays.copyOf(this.memoryUsageListeners, this.memoryUsageListeners.length + 1);
+       newMemoryUsageListeners[this.memoryUsageListeners.length] = listener;
+       this.memoryUsageListeners = newMemoryUsageListeners;
+     }
+   }
+   
+   @Override
+   public void removeMemoryUsageListener(final MemoryUsageListener listener) {
+     synchronized (this.memoryUsageListeners) {
+       int listenerIndex = -1;
+       for (int i = 0; i < this.memoryUsageListeners.length; i++) {
+         if (this.memoryUsageListeners[i] == listener) {
+           listenerIndex = i;
+           break;
+         }
+       }
+ 
+       if (listenerIndex != -1) {
+         final MemoryUsageListener[] newMemoryUsageListeners = new MemoryUsageListener[this.memoryUsageListeners.length - 1];
+         System.arraycopy(this.memoryUsageListeners, 0, newMemoryUsageListeners, 0, listenerIndex);
+         System.arraycopy(this.memoryUsageListeners, listenerIndex + 1, newMemoryUsageListeners, listenerIndex,
+             this.memoryUsageListeners.length - listenerIndex - 1);
+         this.memoryUsageListeners = newMemoryUsageListeners;
+       }
+     }
+   }
+   
+   void notifyListeners() {
+     final MemoryUsageListener[] savedListeners = this.memoryUsageListeners;
+     
+     if (savedListeners.length == 0) {
+       return;
+     }
+ 
+     final long bytesUsed = getUsedMemory();
+     for (int i = 0; i < savedListeners.length; i++) {
+       savedListeners[i].updateMemoryUsed(bytesUsed);
+     }
+   }
+   
+   static void validateAddress(long addr) {
+     validateAddressAndSize(addr, -1);
+   }
+   
+   static void validateAddressAndSize(long addr, int size) {
+     // if the caller does not have a "size" to provide then use -1
+     if ((addr & 7) != 0) {
+       StringBuilder sb = new StringBuilder();
+       sb.append("address was not 8 byte aligned: 0x").append(Long.toString(addr, 16));
+       SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.singleton;
+       if (ma != null) {
+         sb.append(". Valid addresses must be in one of the following ranges: ");
 -        for (int i=0; i < ma.slabs.length; i++) {
 -          long startAddr = ma.slabs[i].getMemoryAddress();
 -          long endAddr = startAddr + ma.slabs[i].getSize();
 -          sb.append("[").append(Long.toString(startAddr, 16)).append("..").append(Long.toString(endAddr, 16)).append("] ");
 -        }
 -      }
++        ma.freeList.getSlabDescriptions(sb);
++     }
+       throw new IllegalStateException(sb.toString());
+     }
+     if (addr >= 0 && addr < 1024) {
+       throw new IllegalStateException("addr was smaller than expected 0x" + addr);
+     }
+     validateAddressAndSizeWithinSlab(addr, size, DO_EXPENSIVE_VALIDATION);
+   }
+ 
+   static void validateAddressAndSizeWithinSlab(long addr, int size, boolean doExpensiveValidation) {
+     if (doExpensiveValidation) {
+       SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.singleton;
+       if (ma != null) {
 -        for (int i=0; i < ma.slabs.length; i++) {
 -          if (ma.slabs[i].getMemoryAddress() <= addr && addr < (ma.slabs[i].getMemoryAddress() + ma.slabs[i].getSize())) {
 -            // validate addr + size is within the same slab
 -            if (size != -1) { // skip this check if size is -1
 -              if (!(ma.slabs[i].getMemoryAddress() <= (addr+size-1) && (addr+size-1) < (ma.slabs[i].getMemoryAddress() + ma.slabs[i].getSize()))) {
 -                throw new IllegalStateException(" address 0x" + Long.toString(addr+size-1, 16) + " does not address the original slab memory");
 -              }
 -            }
 -            return;
 -          }
++        if (!ma.freeList.validateAddressAndSizeWithinSlab(addr, size)) {
++          throw new IllegalStateException(" address 0x" + Long.toString(addr, 16) + " does not address the original slab memory");
+         }
 -        throw new IllegalStateException(" address 0x" + Long.toString(addr, 16) + " does not address the original slab memory");
+       }
+     }
+   }
+ 
+   public synchronized List<MemoryBlock> getOrphans() {
 -    List<Chunk> liveChunks = this.freeList.getLiveChunks();
 -    List<Chunk> regionChunks = getRegionLiveChunks();
++    List<ObjectChunk> liveChunks = this.freeList.getLiveChunks();
++    List<ObjectChunk> regionChunks = getRegionLiveChunks();
+     liveChunks.removeAll(regionChunks);
+     List<MemoryBlock> orphans = new ArrayList<MemoryBlock>();
 -    for (Chunk chunk: liveChunks) {
++    for (ObjectChunk chunk: liveChunks) {
+       orphans.add(new MemoryBlockNode(this, chunk));
+     }
+     Collections.sort(orphans,
+         new Comparator<MemoryBlock>() {
+           @Override
+           public int compare(MemoryBlock o1, MemoryBlock o2) {
+             return Long.valueOf(o1.getMemoryAddress()).compareTo(o2.getMemoryAddress());
+           }
+         });
+     //this.memoryBlocks = new WeakReference<List<MemoryBlock>>(orphans);
+     return orphans;
+   }
+ 
+   @Override
+   public MemoryInspector getMemoryInspector() {
+     return this.memoryInspector;
+   }
 -
 -  /*
 -   * Set this to "true" to perform data integrity checks on allocated and reused Chunks.  This may clobber 
 -   * performance so turn on only when necessary.
 -   */
 -  final boolean validateMemoryWithFill = Boolean.getBoolean("gemfire.validateOffHeapWithFill");
+   
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SyncChunkStack.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SyncChunkStack.java
index 0000000,7ba28a2..99fd96f
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SyncChunkStack.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SyncChunkStack.java
@@@ -1,0 -1,141 +1,141 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import com.gemstone.gemfire.LogWriter;
+ 
+ /**
+  * A "stack" of "chunk" instances. The chunks are not kept
+  * in java object form but instead each "chunk" is just an
+  * off-heap address.
+  * This class is used for each "tiny" free-list of the off-heap memory allocator.
+  */
+ public class SyncChunkStack {
+   // Ok to read without sync but must be synced on write
+   private volatile long topAddr;
+   
+   public SyncChunkStack(long addr) {
+     if (addr != 0L) SimpleMemoryAllocatorImpl.validateAddress(addr);
+     this.topAddr = addr;
+   }
+   public SyncChunkStack() {
+     this.topAddr = 0L;
+   }
+   public boolean isEmpty() {
+     return this.topAddr == 0L;
+   }
+   public void offer(long e) {
+     assert e != 0;
+     SimpleMemoryAllocatorImpl.validateAddress(e);
+     synchronized (this) {
 -      Chunk.setNext(e, this.topAddr);
++      ObjectChunk.setNext(e, this.topAddr);
+       this.topAddr = e;
+     }
+   }
+   public long poll() {
+     long result;
+     synchronized (this) {
+       result = this.topAddr;
+       if (result != 0L) {
 -        this.topAddr = Chunk.getNext(result);
++        this.topAddr = ObjectChunk.getNext(result);
+       }
+     }
+     return result;
+   }
+   /**
+    * Returns the address of the "top" item in this stack.
+    */
+   public long getTopAddress() {
+     return this.topAddr;
+   }
+   /**
+    * Removes all the Chunks from this stack
+    * and returns the address of the first chunk.
+    * The caller owns all the Chunks after this call.
+    */
+   public long clear() {
+     long result;
+     synchronized (this) {
+       result = this.topAddr;
+       if (result != 0L) {
+         this.topAddr = 0L;
+       }
+     }
+     return result;
+   }
+   public void logSizes(LogWriter lw, String msg) {
+     long headAddr = this.topAddr;
+     long addr;
+     boolean concurrentModDetected;
+     do {
+       concurrentModDetected = false;
+       addr = headAddr;
+       while (addr != 0L) {
 -        int curSize = Chunk.getSize(addr);
 -        addr = Chunk.getNext(addr);
++        int curSize = ObjectChunk.getSize(addr);
++        addr = ObjectChunk.getNext(addr);
+         testHookDoConcurrentModification();
+         long curHead = this.topAddr;
+         if (curHead != headAddr) {
+           headAddr = curHead;
+           concurrentModDetected = true;
+           // Someone added or removed from the stack.
+           // So we break out of the inner loop and start
+           // again at the new head.
+           break;
+         }
+         // TODO construct a single log msg
+         // that gets reset when concurrentModDetected.
+         lw.info(msg + curSize);
+       }
+     } while (concurrentModDetected);
+   }
+   public long computeTotalSize() {
+     long result;
+     long headAddr = this.topAddr;
+     long addr;
+     boolean concurrentModDetected;
+     do {
+       concurrentModDetected = false;
+       result = 0;
+       addr = headAddr;
+       while (addr != 0L) {
 -        result += Chunk.getSize(addr);
 -        addr = Chunk.getNext(addr);
++        result += ObjectChunk.getSize(addr);
++        addr = ObjectChunk.getNext(addr);
+         testHookDoConcurrentModification();
+         long curHead = this.topAddr;
+         if (curHead != headAddr) {
+           headAddr = curHead;
+           concurrentModDetected = true;
+           // Someone added or removed from the stack.
+           // So we break out of the inner loop and start
+           // again at the new head.
+           break;
+         }
+       }
+     } while (concurrentModDetected);
+     return result;
+   }
+   
+   /**
+    * This method allows tests to override it
+    * and do a concurrent modification to the stack.
+    * For production code it will be a noop.
+    */
+   protected void testHookDoConcurrentModification() {
+     // nothing needed in production code
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/UnsafeMemoryChunk.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/UnsafeMemoryChunk.java
index 0000000,ed1c843..aebc459
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/UnsafeMemoryChunk.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/UnsafeMemoryChunk.java
@@@ -1,0 -1,223 +1,217 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import com.gemstone.gemfire.internal.SharedLibrary;
+ import com.gemstone.gemfire.pdx.internal.unsafe.UnsafeWrapper;
+ 
+ /**
+  * Represents a single addressable chunk of off-heap memory. The size specifies
+  * the number of bytes stored at the address.
+  * 
+  * @since 9.0
+  */
 -public class UnsafeMemoryChunk implements MemoryChunk {
++public class UnsafeMemoryChunk implements AddressableMemoryChunk {
+   private static final UnsafeWrapper unsafe;
+   private static final int ARRAY_BYTE_BASE_OFFSET;
+   private static String reason;
+   static {
+ 	UnsafeWrapper tmp = null;
+ 	try {
+ 	  tmp = new UnsafeWrapper();
+ 	  reason = null;
+ 	} catch (RuntimeException ignore) {
+       reason = ignore.toString();
+ 	} catch (Error ignore) {
+       reason = ignore.toString();
+ 	}
+ 	unsafe = tmp;
+     ARRAY_BYTE_BASE_OFFSET = unsafe != null ? unsafe.arrayBaseOffset(byte[].class) : 0;
+   }
+   
+   private final long data;
+   private final int size;
+   
+   public UnsafeMemoryChunk(int size) {
+ 	if (unsafe == null) {
+       throw new OutOfMemoryError("Off-heap memory is not available because: " + reason);
+ 	}
+     try {
+     this.data = unsafe.allocateMemory(size);
+     this.size = size;
+     } catch (OutOfMemoryError err) {
+       String msg = "Failed creating " + size + " bytes of off-heap memory during cache creation.";
+       if (err.getMessage() != null && !err.getMessage().isEmpty()) {
+         msg += " Cause: " + err.getMessage();
+       }
+       if (!SharedLibrary.is64Bit() && size >= (1024*1024*1024)) {
+         msg += " The JVM looks like a 32-bit one. For large amounts of off-heap memory a 64-bit JVM is needed.";
+       }
+       throw new OutOfMemoryError(msg);
+     }
+   }
+ 
+   @Override
+   public int getSize() {
+     return (int)this.size;
+   }
+   
++  /* (non-Javadoc)
++   * @see com.gemstone.gemfire.internal.offheap.AddressableMemoryChunk#getMemoryAddress()
++   */
++  @Override
+   public long getMemoryAddress() {
+     return this.data;
+   }
+   
+   public static byte readAbsoluteByte(long addr) {
+     return unsafe.getByte(addr);
+   }
+   public static char readAbsoluteChar(long addr) {
+     return unsafe.getChar(null, addr);
+   }
+   public static short readAbsoluteShort(long addr) {
+     return unsafe.getShort(null, addr);
+   }
+   public static int readAbsoluteInt(long addr) {
+     return unsafe.getInt(null, addr);
+   }
+   public static int readAbsoluteIntVolatile(long addr) {
+     return unsafe.getIntVolatile(null, addr);
+   }
+   public static long readAbsoluteLong(long addr) {
+     return unsafe.getLong(null, addr);
+   }
+   public static long readAbsoluteLongVolatile(long addr) {
+     return unsafe.getLongVolatile(null, addr);
+   }
+ 
+   @Override
+   public byte readByte(int offset) {
+     return readAbsoluteByte(this.data+offset);
+   }
+ 
+   public static void writeAbsoluteByte(long addr, byte value) {
+     unsafe.putByte(addr, value);
+   }
+        
+   public static void writeAbsoluteInt(long addr, int value) {
+     unsafe.putInt(null, addr, value);
+   }
+   public static void writeAbsoluteIntVolatile(long addr, int value) {
+     unsafe.putIntVolatile(null, addr, value);
+   }
+   public static boolean writeAbsoluteIntVolatile(long addr, int expected, int value) {
+     return unsafe.compareAndSwapInt(null, addr, expected, value);
+   }
+   public static void writeAbsoluteLong(long addr, long value) {
+     unsafe.putLong(null, addr, value);
+   }
+   public static void writeAbsoluteLongVolatile(long addr, long value) {
+     unsafe.putLongVolatile(null, addr, value);
+   }
+   public static boolean writeAbsoluteLongVolatile(long addr, long expected, long value) {
+     return unsafe.compareAndSwapLong(null, addr, expected, value);
+   }
+ 
+   @Override
+   public void writeByte(int offset, byte value) {
+     writeAbsoluteByte(this.data+offset, value);
+   }
+ 
+   @Override
+   public void readBytes(int offset, byte[] bytes) {
+     readBytes(offset, bytes, 0, bytes.length);
+   }
+ 
+   @Override
+   public void writeBytes(int offset, byte[] bytes) {
+     writeBytes(offset, bytes, 0, bytes.length);
+   }
+ 
+   public static void readAbsoluteBytes(long addr, byte[] bytes, int bytesOffset, int size) {
+     // Throwing an Error instead of using the "assert" keyword because passing < 0 to
+     // copyMemory(...) can lead to a core dump with some JVMs and we don't want to
+     // require the -ea JVM flag.
+     if (size < 0) {
+       throw new AssertionError("Size=" + size + ", but size must be >= 0");
+     }
+     
+     assert bytesOffset >= 0 : "byteOffset=" + bytesOffset;
+     assert bytesOffset + size <= bytes.length : "byteOffset=" + bytesOffset + ",size=" + size + ",bytes.length=" + bytes.length;
+     
+     if (size == 0) {
+       return; // No point in wasting time copying 0 bytes
+     }
+     unsafe.copyMemory(null, addr, bytes, ARRAY_BYTE_BASE_OFFSET+bytesOffset, size);
+   }
+ 
+   @Override
+   public void readBytes(int offset, byte[] bytes, int bytesOffset, int size) {
+     readAbsoluteBytes(this.data+offset, bytes, bytesOffset, size);
+   }
+ 
+   public static void copyMemory(long srcAddr, long dstAddr, long size) {
+     unsafe.copyMemory(srcAddr, dstAddr, size);
+   }
+   
+   public static void writeAbsoluteBytes(long addr, byte[] bytes, int bytesOffset, int size) {
+     // Throwing an Error instead of using the "assert" keyword because passing < 0 to
+     // copyMemory(...) can lead to a core dump with some JVMs and we don't want to
+     // require the -ea JVM flag.
+     if (size < 0) {
+       throw new AssertionError("Size=" + size + ", but size must be >= 0");
+     }
+ 
+     assert bytesOffset >= 0 : "byteOffset=" + bytesOffset;
+     assert bytesOffset + size <= bytes.length : "byteOffset=" + bytesOffset + ",size=" + size + ",bytes.length=" + bytes.length;
+     
+     if (size == 0) {
+       return; // No point in wasting time copying 0 bytes
+     }
+     unsafe.copyMemory(bytes, ARRAY_BYTE_BASE_OFFSET+bytesOffset, null, addr, size);
+   }
+ 
+   public static void fill(long addr, int size, byte fill) {
+     unsafe.setMemory(addr, size, fill);
+   }
+   
+   @Override
+   public void writeBytes(int offset, byte[] bytes, int bytesOffset, int size) {
+     writeAbsoluteBytes(this.data+offset, bytes, bytesOffset, size);
+   }
+ 
+   @Override
+   public void release() {
+     unsafe.freeMemory(this.data);
+   }
+ 
+   @Override
+   public void copyBytes(int src, int dst, int size) {
+     unsafe.copyMemory(this.data+src, this.data+dst, size);
+   }
+   
+   @Override
+   public String toString() {
+     final StringBuilder sb = new StringBuilder(getClass().getSimpleName());
+     sb.append("{");
+     sb.append("MemoryAddress=").append(getMemoryAddress());
+     sb.append(", Size=").append(getSize());
+     sb.append("}");
+     return sb.toString();
+   }
 -  
 -  /**
 -   * Used to create UnsafeMemoryChunk instances.
 -   */
 -  public interface Factory {
 -    /** Create and return an UnsafeMemoryChunk.
 -     * @throws OutOfMemoryError if the create fails
 -     */
 -    public UnsafeMemoryChunk create(int size);
 -  }
+ }