You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/03/15 19:38:45 UTC

[07/45] incubator-geode git commit: GEODE-982: refactor off-heap

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
index 14bde59..209a4a4 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
@@ -90,17 +90,17 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
   public static MemoryAllocator create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats, LogWriter lw, 
       int slabCount, long offHeapMemorySize, long maxSlabSize) {
     return create(ooohml, stats, lw, slabCount, offHeapMemorySize, maxSlabSize,
-        null, new AddressableMemoryChunkFactory() {
+        null, new SlabFactory() {
       @Override
-      public AddressableMemoryChunk create(int size) {
-        return new UnsafeMemoryChunk(size);
+      public Slab create(int size) {
+        return new SlabImpl(size);
       }
     });
   }
 
   private static SimpleMemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats, LogWriter lw, 
       int slabCount, long offHeapMemorySize, long maxSlabSize, 
-      AddressableMemoryChunk[] slabs, AddressableMemoryChunkFactory memChunkFactory) {
+      Slab[] slabs, SlabFactory slabFactory) {
     SimpleMemoryAllocatorImpl result = singleton;
     boolean created = false;
     try {
@@ -118,16 +118,16 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
         if (lw != null) {
           lw.config("Allocating " + offHeapMemorySize + " bytes of off-heap memory. The maximum size of a single off-heap object is " + maxSlabSize + " bytes.");
         }
-        slabs = new UnsafeMemoryChunk[slabCount];
+        slabs = new SlabImpl[slabCount];
         long uncreatedMemory = offHeapMemorySize;
         for (int i=0; i < slabCount; i++) {
           try {
             if (uncreatedMemory >= maxSlabSize) {
-              slabs[i] = memChunkFactory.create((int) maxSlabSize);
+              slabs[i] = slabFactory.create((int) maxSlabSize);
               uncreatedMemory -= maxSlabSize;
             } else {
               // the last slab can be smaller then maxSlabSize
-              slabs[i] = memChunkFactory.create((int) uncreatedMemory);
+              slabs[i] = slabFactory.create((int) uncreatedMemory);
             }
           } catch (OutOfMemoryError err) {
             if (i > 0) {
@@ -137,7 +137,7 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
             }
             for (int j=0; j < i; j++) {
               if (slabs[j] != null) {
-                slabs[j].release();
+                slabs[j].free();
               }
             }
             throw err;
@@ -163,11 +163,11 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
     return result;
   }
   static SimpleMemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats, LogWriter lw, 
-      int slabCount, long offHeapMemorySize, long maxSlabSize, AddressableMemoryChunkFactory memChunkFactory) {
+      int slabCount, long offHeapMemorySize, long maxSlabSize, SlabFactory memChunkFactory) {
     return create(ooohml, stats, lw, slabCount, offHeapMemorySize, maxSlabSize, 
         null, memChunkFactory);
   }
-  public static SimpleMemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats stats, AddressableMemoryChunk[] slabs) {
+  public static SimpleMemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats stats, Slab[] slabs) {
     int slabCount = 0;
     long offHeapMemorySize = 0;
     long maxSlabSize = 0;
@@ -185,7 +185,7 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
   }
   
   
-  private void reuse(OutOfOffHeapMemoryListener oooml, LogWriter lw, OffHeapMemoryStats newStats, long offHeapMemorySize, AddressableMemoryChunk[] slabs) {
+  private void reuse(OutOfOffHeapMemoryListener oooml, LogWriter lw, OffHeapMemoryStats newStats, long offHeapMemorySize, Slab[] slabs) {
     if (isClosed()) {
       throw new IllegalStateException("Can not reuse a closed off-heap memory manager.");
     }
@@ -205,7 +205,7 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
     this.stats = newStats;
   }
 
-  private SimpleMemoryAllocatorImpl(final OutOfOffHeapMemoryListener oooml, final OffHeapMemoryStats stats, final AddressableMemoryChunk[] slabs) {
+  private SimpleMemoryAllocatorImpl(final OutOfOffHeapMemoryListener oooml, final OffHeapMemoryStats stats, final Slab[] slabs) {
     if (oooml == null) {
       throw new IllegalArgumentException("OutOfOffHeapMemoryListener is null");
     }
@@ -224,20 +224,20 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
     this.stats.incFreeMemory(this.freeList.getTotalMemory());
   }
   
-  public List<ObjectChunk> getLostChunks() {
-    List<ObjectChunk> liveChunks = this.freeList.getLiveChunks();
-    List<ObjectChunk> regionChunks = getRegionLiveChunks();
-    Set<ObjectChunk> liveChunksSet = new HashSet<>(liveChunks);
-    Set<ObjectChunk> regionChunksSet = new HashSet<>(regionChunks);
+  public List<OffHeapStoredObject> getLostChunks() {
+    List<OffHeapStoredObject> liveChunks = this.freeList.getLiveChunks();
+    List<OffHeapStoredObject> regionChunks = getRegionLiveChunks();
+    Set<OffHeapStoredObject> liveChunksSet = new HashSet<>(liveChunks);
+    Set<OffHeapStoredObject> regionChunksSet = new HashSet<>(regionChunks);
     liveChunksSet.removeAll(regionChunksSet);
-    return new ArrayList<ObjectChunk>(liveChunksSet);
+    return new ArrayList<OffHeapStoredObject>(liveChunksSet);
   }
   
   /**
    * Returns a possibly empty list that contains all the Chunks used by regions.
    */
-  private List<ObjectChunk> getRegionLiveChunks() {
-    ArrayList<ObjectChunk> result = new ArrayList<ObjectChunk>();
+  private List<OffHeapStoredObject> getRegionLiveChunks() {
+    ArrayList<OffHeapStoredObject> result = new ArrayList<OffHeapStoredObject>();
     RegionService gfc = GemFireCacheImpl.getInstance();
     if (gfc != null) {
       Iterator<Region<?,?>> rootIt = gfc.rootRegions().iterator();
@@ -253,7 +253,7 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
     return result;
   }
 
-  private void getRegionLiveChunks(Region<?,?> r, List<ObjectChunk> result) {
+  private void getRegionLiveChunks(Region<?,?> r, List<OffHeapStoredObject> result) {
     if (r.getAttributes().getOffHeap()) {
 
       if (r instanceof PartitionedRegion) {
@@ -277,7 +277,7 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
 
   }
   
-  private void basicGetRegionLiveChunks(LocalRegion r, List<ObjectChunk> result) {
+  private void basicGetRegionLiveChunks(LocalRegion r, List<OffHeapStoredObject> result) {
     for (Object key : r.keySet()) {
       RegionEntry re = ((LocalRegion) r).getRegionEntry(key);
       if (re != null) {
@@ -286,30 +286,30 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
          */
         @Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
         Object value = re._getValue();
-        if (value instanceof ObjectChunk) {
-          result.add((ObjectChunk) value);
+        if (value instanceof OffHeapStoredObject) {
+          result.add((OffHeapStoredObject) value);
         }
       }
     }
   }
 
-  private ObjectChunk allocateChunk(int size) {
-    ObjectChunk result = this.freeList.allocate(size);
+  private OffHeapStoredObject allocateOffHeapStoredObject(int size) {
+    OffHeapStoredObject result = this.freeList.allocate(size);
     int resultSize = result.getSize();
     stats.incObjects(1);
     stats.incUsedMemory(resultSize);
     stats.incFreeMemory(-resultSize);
     notifyListeners();
     if (ReferenceCountHelper.trackReferenceCounts()) {
-      ReferenceCountHelper.refCountChanged(result.getMemoryAddress(), false, 1);
+      ReferenceCountHelper.refCountChanged(result.getAddress(), false, 1);
     }
     return result;
   }
   
   @Override
-  public MemoryChunk allocate(int size) {
+  public StoredObject allocate(int size) {
     //System.out.println("allocating " + size);
-    ObjectChunk result = allocateChunk(size);
+    OffHeapStoredObject result = allocateOffHeapStoredObject(size);
     //("allocated off heap object of size " + size + " @" + Long.toHexString(result.getMemoryAddress()), true);
     return result;
   }
@@ -324,16 +324,23 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
   
   @Override
   public StoredObject allocateAndInitialize(byte[] v, boolean isSerialized, boolean isCompressed) {
+    return allocateAndInitialize(v, isSerialized, isCompressed, null);
+  }
+  @Override
+  public StoredObject allocateAndInitialize(byte[] v, boolean isSerialized, boolean isCompressed, byte[] originalHeapData) {
     long addr = OffHeapRegionEntryHelper.encodeDataAsAddress(v, isSerialized, isCompressed);
     if (addr != 0L) {
-      return new DataAsAddress(addr);
+      return new TinyStoredObject(addr);
     }
-    ObjectChunk result = allocateChunk(v.length);
+    OffHeapStoredObject result = allocateOffHeapStoredObject(v.length);
     //debugLog("allocated off heap object of size " + v.length + " @" + Long.toHexString(result.getMemoryAddress()), true);
     //debugLog("allocated off heap object of size " + v.length + " @" + Long.toHexString(result.getMemoryAddress()) +  "chunkSize=" + result.getSize() + " isSerialized=" + isSerialized + " v=" + Arrays.toString(v), true);
     result.setSerializedValue(v);
     result.setSerialized(isSerialized);
     result.setCompressed(isCompressed);
+    if (originalHeapData != null) {
+      result = new OffHeapStoredObjectWithHeapForm(result, originalHeapData);
+    }
     return result;
   }
   
@@ -485,18 +492,18 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
   }
 
   public synchronized List<MemoryBlock> getOrphans() {
-    List<ObjectChunk> liveChunks = this.freeList.getLiveChunks();
-    List<ObjectChunk> regionChunks = getRegionLiveChunks();
+    List<OffHeapStoredObject> liveChunks = this.freeList.getLiveChunks();
+    List<OffHeapStoredObject> regionChunks = getRegionLiveChunks();
     liveChunks.removeAll(regionChunks);
     List<MemoryBlock> orphans = new ArrayList<MemoryBlock>();
-    for (ObjectChunk chunk: liveChunks) {
+    for (OffHeapStoredObject chunk: liveChunks) {
       orphans.add(new MemoryBlockNode(this, chunk));
     }
     Collections.sort(orphans,
         new Comparator<MemoryBlock>() {
           @Override
           public int compare(MemoryBlock o1, MemoryBlock o2) {
-            return Long.valueOf(o1.getMemoryAddress()).compareTo(o2.getMemoryAddress());
+            return Long.valueOf(o1.getAddress()).compareTo(o2.getAddress());
           }
         });
     //this.memoryBlocks = new WeakReference<List<MemoryBlock>>(orphans);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/Slab.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/Slab.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/Slab.java
new file mode 100644
index 0000000..000b8cb
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/Slab.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.offheap;
+
+/**
+ * A "slab" of memory.
+ * Slabs can be created by calling {@link AddressableMemoryManager#allocateSlab(int)}.
+ * Slabs have an address, a size, and can be freed.
+ */
+public interface Slab {
+  /**
+   * Return the address of the memory of this slab.
+   */
+  public long getMemoryAddress();
+  /**
+   * Returns the size of this memory chunk in bytes.
+   */
+  public int getSize();
+  /**
+   * Returns any memory allocated for this slab.
+   * Note that after free is called the address of
+   * this slab should no longer be used.
+   */
+  public void free();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SlabFactory.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SlabFactory.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SlabFactory.java
new file mode 100644
index 0000000..a3f457d
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SlabFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.offheap;
+
+/**
+ * Used to create Slab instances.
+ */
+public interface SlabFactory {
+  /** Create and return a Slab
+   * @throws OutOfMemoryError if the create fails
+   */
+  public Slab create(int size);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SlabImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SlabImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SlabImpl.java
new file mode 100644
index 0000000..1c88bde
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SlabImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.offheap;
+
+/**
+ * Implements the Slab interface using AddressableMemoryManager.
+ * 
+ * @since 9.0
+ */
+public class SlabImpl implements Slab {
+  private final long address;
+  private final int size;
+  
+  public SlabImpl(int size) {
+    this(AddressableMemoryManager.allocate(size), size);
+  }
+
+  public SlabImpl(long addr, int size) {
+    this.address = addr;
+    this.size = size;
+  }
+  
+  @Override
+  public int getSize() {
+    return this.size;
+  }
+  
+  @Override
+  public long getMemoryAddress() {
+    return this.address;
+  }
+  
+  @Override
+  public void free() {
+    AddressableMemoryManager.free(this.address);
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(getClass().getSimpleName());
+    sb.append("{");
+    sb.append("MemoryAddress=").append(getMemoryAddress());
+    sb.append(", Size=").append(getSize());
+    sb.append("}");
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/StoredObject.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/StoredObject.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/StoredObject.java
index 4d93a07..26cb81f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/StoredObject.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/StoredObject.java
@@ -18,38 +18,27 @@ package com.gemstone.gemfire.internal.offheap;
 
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import com.gemstone.gemfire.internal.Sendable;
 import com.gemstone.gemfire.internal.cache.CachedDeserializable;
 import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
 
 /**
  * Represents an object stored in the cache.
  * Currently this interface is only used for values stored in off-heap regions.
+ * This interface provides methods that let you read and write the bytes
+ * of addressable memory used to store the bytes of the object.
+ * A reference count is used to determine if the object is still allocated.
+ * To increment the count call {@link #retain()}.
+ * To decrement the count call {@link #release()}.
  * At some point in the future it may also be used for values stored in heap regions. 
  * 
  * @author darrel
  * @since 9.0
  */
-public interface StoredObject extends Releasable, Sendable, CachedDeserializable {
-  /**
-   * Call to indicate that this object's memory is in use by the caller.
-   * The memory will stay allocated until {@link #release()} is called.
-   * It is ok for a thread other than the one that called this method to call release.
-   * This method is called implicitly at the time the chunk is allocated.
-   * Note: @Retained tells you that "this" is retained by this method.
-   * 
-   * @throws IllegalStateException if the max ref count is exceeded.
-   * @return true if we are able to retain this chunk; false if we need to retry
-   */
-  @Retained
-  public boolean retain();
-
-  /**
-   * Returns true if the value stored in this memory chunk is a serialized object. Returns false if it is a byte array.
-   */
-  public boolean isSerialized();
-
+public interface StoredObject extends Sendable, CachedDeserializable, Releasable {
   /**
    * Returns true if the value stored in this memory chunk is compressed. Returns false if it is uncompressed.
    */
@@ -92,4 +81,94 @@ public interface StoredObject extends Releasable, Sendable, CachedDeserializable
    * @throws IOException
    */
   void sendAsCachedDeserializable(DataOutput out) throws IOException;
+  
+  /**
+   * Call to indicate that this object's memory is in use by the caller.
+   * The memory will stay allocated until {@link #release()} is called.
+   * It is ok for a thread other than the one that called this method to call release.
+   * This method is called implicitly at the time the chunk is allocated.
+   * Note: @Retained tells you that "this" is retained by this method.
+   * 
+   * @throws IllegalStateException if the max ref count is exceeded.
+   * @return true if we are able to retain this chunk; false if we need to retry
+   */
+  @Retained
+  public boolean retain();
+
+  /**
+   * Returns true if this type of StoredObject uses a references count; false otherwise.
+   */
+  public boolean hasRefCount();
+   /**
+   * Returns the number of users of this memory. If this type of StoredObject does not
+   * have a reference count then -1 is returned.
+   */
+  public int getRefCount();
+  
+  /**
+   * Returns the address of the memory used to store this object.
+   * This address may not be to the first byte of stored data since
+   * the implementation may store some internal data in the first bytes of the memory.
+   * This address can be used with AddressableMemoryManager.
+   */
+  public long getAddress();
+
+  /**
+   * Returns the number of bytes of memory used by this object to store an object.
+   * This size includes any bytes used for padding and meta-information.
+   */
+  public int getSize();
+  
+  /**
+   * Returns the number of bytes of memory used to store the object.
+   * This size does not include any bytes used for padding.
+   */
+  public int getDataSize();
+  public byte readDataByte(int offset);
+  public void writeDataByte(int offset, byte value);
+  public void readDataBytes(int offset, byte[] bytes);
+  public void writeDataBytes(int offset, byte[] bytes);
+  public void readDataBytes(int offset, byte[] bytes, int bytesOffset, int size);
+  public void writeDataBytes(int offset, byte[] bytes, int bytesOffset, int size);
+  /**
+   * Returns an address that can read data from this StoredObject at the given offset.
+   */
+  public long getAddressForReadingData(int offset, int size);
+  
+  /**
+   * Returns a StoredObject that acts as if its data is our data starting
+   * at the given offset and limited to the given number of bytes.
+   */
+  public StoredObject slice(int offset, int limit);
+  
+  /**
+   * Returns true if our data is equal to other's data; false otherwise.
+   */
+  public boolean checkDataEquals(StoredObject other);
+  /**
+   * Returns true if the given bytes are equal to our data bytes; false otherwise
+   */
+  public boolean checkDataEquals(byte[] serializedObj);
+
+  /**
+   * Creates and returns a direct ByteBuffer that contains the data of this stored object.
+   * Note that the returned ByteBuffer has a reference to the
+   * address of this stored object so it can only be used while this stored object is retained.
+   * @return the created direct byte buffer or null if it could not be created.
+   */
+  @Unretained
+  public ByteBuffer createDirectByteBuffer();
+  /**
+   * Returns true if the data is serialized with PDX
+   */
+  public boolean isSerializedPdxInstance();
+  
+  /**
+   * Returns a StoredObject that does not cache the heap form.
+   * If a StoredObject is going to be kept around for a while then
+   * it is good to call this so that it will not also keep the heap
+   * form in memory.
+   */
+  public StoredObject getStoredObjectWithoutHeapForm();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SyncChunkStack.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SyncChunkStack.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SyncChunkStack.java
deleted file mode 100644
index 99fd96f..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SyncChunkStack.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.offheap;
-
-import com.gemstone.gemfire.LogWriter;
-
-/**
- * A "stack" of "chunk" instances. The chunks are not kept
- * in java object form but instead each "chunk" is just an
- * off-heap address.
- * This class is used for each "tiny" free-list of the off-heap memory allocator.
- */
-public class SyncChunkStack {
-  // Ok to read without sync but must be synced on write
-  private volatile long topAddr;
-  
-  public SyncChunkStack(long addr) {
-    if (addr != 0L) SimpleMemoryAllocatorImpl.validateAddress(addr);
-    this.topAddr = addr;
-  }
-  public SyncChunkStack() {
-    this.topAddr = 0L;
-  }
-  public boolean isEmpty() {
-    return this.topAddr == 0L;
-  }
-  public void offer(long e) {
-    assert e != 0;
-    SimpleMemoryAllocatorImpl.validateAddress(e);
-    synchronized (this) {
-      ObjectChunk.setNext(e, this.topAddr);
-      this.topAddr = e;
-    }
-  }
-  public long poll() {
-    long result;
-    synchronized (this) {
-      result = this.topAddr;
-      if (result != 0L) {
-        this.topAddr = ObjectChunk.getNext(result);
-      }
-    }
-    return result;
-  }
-  /**
-   * Returns the address of the "top" item in this stack.
-   */
-  public long getTopAddress() {
-    return this.topAddr;
-  }
-  /**
-   * Removes all the Chunks from this stack
-   * and returns the address of the first chunk.
-   * The caller owns all the Chunks after this call.
-   */
-  public long clear() {
-    long result;
-    synchronized (this) {
-      result = this.topAddr;
-      if (result != 0L) {
-        this.topAddr = 0L;
-      }
-    }
-    return result;
-  }
-  public void logSizes(LogWriter lw, String msg) {
-    long headAddr = this.topAddr;
-    long addr;
-    boolean concurrentModDetected;
-    do {
-      concurrentModDetected = false;
-      addr = headAddr;
-      while (addr != 0L) {
-        int curSize = ObjectChunk.getSize(addr);
-        addr = ObjectChunk.getNext(addr);
-        testHookDoConcurrentModification();
-        long curHead = this.topAddr;
-        if (curHead != headAddr) {
-          headAddr = curHead;
-          concurrentModDetected = true;
-          // Someone added or removed from the stack.
-          // So we break out of the inner loop and start
-          // again at the new head.
-          break;
-        }
-        // TODO construct a single log msg
-        // that gets reset when concurrentModDetected.
-        lw.info(msg + curSize);
-      }
-    } while (concurrentModDetected);
-  }
-  public long computeTotalSize() {
-    long result;
-    long headAddr = this.topAddr;
-    long addr;
-    boolean concurrentModDetected;
-    do {
-      concurrentModDetected = false;
-      result = 0;
-      addr = headAddr;
-      while (addr != 0L) {
-        result += ObjectChunk.getSize(addr);
-        addr = ObjectChunk.getNext(addr);
-        testHookDoConcurrentModification();
-        long curHead = this.topAddr;
-        if (curHead != headAddr) {
-          headAddr = curHead;
-          concurrentModDetected = true;
-          // Someone added or removed from the stack.
-          // So we break out of the inner loop and start
-          // again at the new head.
-          break;
-        }
-      }
-    } while (concurrentModDetected);
-    return result;
-  }
-  
-  /**
-   * This method allows tests to override it
-   * and do a concurrent modification to the stack.
-   * For production code it will be a noop.
-   */
-  protected void testHookDoConcurrentModification() {
-    // nothing needed in production code
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/TinyStoredObject.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/TinyStoredObject.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/TinyStoredObject.java
new file mode 100644
index 0000000..e8878fa
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/TinyStoredObject.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.offheap;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.internal.cache.BytesAndBitsForCompactor;
+import com.gemstone.gemfire.internal.cache.EntryBits;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.RegionEntry;
+import com.gemstone.gemfire.internal.cache.RegionEntryContext;
+
+/**
+ * Used to represent stored objects that can be stored
+ * in the address field.
+ * The RegionEntry for an off-heap region uses a primitive
+ * long to store the off-heap address of the entry's value.
+ * If the value can be encoded as a long (i.e. its serialized
+ * representation will fit in the 8 bytes of a long without looking
+ * like an actual off-heap address) then these tiny values on an
+ * off-heap regions are actually stored on the heap in the primitive
+ * long field. When these values are "objectified" they will be an
+ * instance of this class.
+ * Instances of this class have a very short lifetime.
+ */
+public class TinyStoredObject extends AbstractStoredObject {
+  private final long address;
+  
+  public TinyStoredObject(long addr) {
+    this.address = addr;
+  }
+  
+  @Override
+  public long getAddress() {
+    return this.address;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o instanceof TinyStoredObject) {
+      return getAddress() == ((TinyStoredObject) o).getAddress();
+    }
+    return false;
+  }
+  
+  @Override
+  public int hashCode() {
+    long value = getAddress();
+    return (int)(value ^ (value >>> 32));
+  }
+
+  @Override
+  public int getSizeInBytes() {
+    return 0;
+  }
+
+  public byte[] getDecompressedBytes(RegionEntryContext r) {
+    byte[] bytes = OffHeapRegionEntryHelper.decodeAddressToBytes(getAddress(), true, true);
+    if (isCompressed()) {
+        long time = r.getCachePerfStats().startDecompression();
+        bytes = r.getCompressor().decompress(bytes);
+        r.getCachePerfStats().endDecompression(time);
+    }
+    return bytes;
+  }
+
+  /**
+   * If we contain a byte[] return it.
+   * Otherwise return the serialize bytes in us in a byte array.
+   */
+  public byte[] getRawBytes() {
+    return OffHeapRegionEntryHelper.decodeAddressToBytes(getAddress(), true, false);
+  }
+  
+  @Override
+  public byte[] getSerializedValue() {
+    byte[] value = OffHeapRegionEntryHelper.decodeAddressToBytes(this.address, true, false);
+    if (!isSerialized()) {
+      value = EntryEventImpl.serialize(value);
+    }
+    return value;
+  }
+
+  @Override
+  public Object getDeserializedValue(Region r, RegionEntry re) {
+    return OffHeapRegionEntryHelper.decodeAddressToObject(this.address);
+  }
+
+  @Override
+  public void fillSerializedValue(BytesAndBitsForCompactor wrapper,
+      byte userBits) {
+    byte[] value;
+    if (isSerialized()) {
+      value = getSerializedValue();
+      userBits = EntryBits.setSerialized(userBits, true);
+    } else {
+      value = (byte[]) getDeserializedForReading();
+    }
+    wrapper.setData(value, userBits, value.length, true);
+  }
+
+  @Override
+  public int getValueSizeInBytes() {
+    return 0;
+  }
+  
+  @Override
+  public boolean isSerialized() {
+    return OffHeapRegionEntryHelper.isSerialized(this.address);
+  }
+
+  @Override
+  public boolean isCompressed() {
+    return OffHeapRegionEntryHelper.isCompressed(this.address);
+  }
+
+  @Override
+  public void release() {
+    // nothing needed
+  }
+
+  @Override
+  public boolean retain() {
+    return true;
+  }
+
+  @Override
+  public int getRefCount() {
+    return -1;
+  }
+
+  @Override
+  public int getSize() {
+    return Long.BYTES;
+  }
+
+  @Override
+  public int getDataSize() {
+    return OffHeapRegionEntryHelper.decodeAddressToDataSize(this.address);
+  }
+
+  @Override
+  public byte readDataByte(int offset) {
+    // TODO OFFHEAP: what if the data is compressed?
+    return getRawBytes()[offset];
+  }
+
+  @Override
+  public void writeDataByte(int offset, byte value) {
+    throw new UnsupportedOperationException("ObjectStoredAsAddress does not support modifying the data bytes");
+  }
+
+  @Override
+  public void readDataBytes(int offset, byte[] bytes) {
+    readDataBytes(offset, bytes, 0, bytes.length);
+  }
+
+  @Override
+  public void writeDataBytes(int offset, byte[] bytes) {
+    writeDataBytes(offset, bytes, 0, bytes.length);
+  }
+
+  @Override
+  public void readDataBytes(int offset, byte[] bytes, int bytesOffset, int size) {
+    // TODO OFFHEAP: what if the data is compressed?
+    byte[] src = getRawBytes();
+    int dstIdx = bytesOffset;
+    for (int i = offset; i < offset+size; i++) {
+      bytes[dstIdx++] = src[i];
+    }
+  }
+
+  @Override
+  public void writeDataBytes(int offset, byte[] bytes, int bytesOffset, int size) {
+    throw new UnsupportedOperationException("ObjectStoredAsAddress does not support modifying the data bytes");
+  }
+
+  @Override
+  public ByteBuffer createDirectByteBuffer() {
+    return null;
+  }
+
+  @Override
+  public boolean hasRefCount() {
+    return false;
+  }
+
+  @Override
+  public boolean checkDataEquals(StoredObject so) {
+    // TODO OFFHEAP: what if the data is compressed?
+    return equals(so);
+  }
+
+  @Override
+  public boolean checkDataEquals(byte[] serializedObj) {
+    // TODO OFFHEAP: what if the data is compressed?
+    byte[] myBytes = getSerializedValue();
+    return Arrays.equals(myBytes, serializedObj);
+  }
+
+  @Override
+  public long getAddressForReadingData(int offset, int size) {
+    throw new UnsupportedOperationException("ObjectStoredAsAddress does not support reading at an address");
+  }
+
+  @Override
+  public StoredObject slice(int offset, int limit) {
+    throw new UnsupportedOperationException("ObjectStoredAsAddress does not support slice");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/UnsafeMemoryChunk.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/UnsafeMemoryChunk.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/UnsafeMemoryChunk.java
deleted file mode 100644
index aebc459..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/UnsafeMemoryChunk.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.offheap;
-
-import com.gemstone.gemfire.internal.SharedLibrary;
-import com.gemstone.gemfire.pdx.internal.unsafe.UnsafeWrapper;
-
-/**
- * Represents a single addressable chunk of off-heap memory. The size specifies
- * the number of bytes stored at the address.
- * 
- * @since 9.0
- */
-public class UnsafeMemoryChunk implements AddressableMemoryChunk {
-  private static final UnsafeWrapper unsafe;
-  private static final int ARRAY_BYTE_BASE_OFFSET;
-  private static String reason;
-  static {
-	UnsafeWrapper tmp = null;
-	try {
-	  tmp = new UnsafeWrapper();
-	  reason = null;
-	} catch (RuntimeException ignore) {
-      reason = ignore.toString();
-	} catch (Error ignore) {
-      reason = ignore.toString();
-	}
-	unsafe = tmp;
-    ARRAY_BYTE_BASE_OFFSET = unsafe != null ? unsafe.arrayBaseOffset(byte[].class) : 0;
-  }
-  
-  private final long data;
-  private final int size;
-  
-  public UnsafeMemoryChunk(int size) {
-	if (unsafe == null) {
-      throw new OutOfMemoryError("Off-heap memory is not available because: " + reason);
-	}
-    try {
-    this.data = unsafe.allocateMemory(size);
-    this.size = size;
-    } catch (OutOfMemoryError err) {
-      String msg = "Failed creating " + size + " bytes of off-heap memory during cache creation.";
-      if (err.getMessage() != null && !err.getMessage().isEmpty()) {
-        msg += " Cause: " + err.getMessage();
-      }
-      if (!SharedLibrary.is64Bit() && size >= (1024*1024*1024)) {
-        msg += " The JVM looks like a 32-bit one. For large amounts of off-heap memory a 64-bit JVM is needed.";
-      }
-      throw new OutOfMemoryError(msg);
-    }
-  }
-
-  @Override
-  public int getSize() {
-    return (int)this.size;
-  }
-  
-  /* (non-Javadoc)
-   * @see com.gemstone.gemfire.internal.offheap.AddressableMemoryChunk#getMemoryAddress()
-   */
-  @Override
-  public long getMemoryAddress() {
-    return this.data;
-  }
-  
-  public static byte readAbsoluteByte(long addr) {
-    return unsafe.getByte(addr);
-  }
-  public static char readAbsoluteChar(long addr) {
-    return unsafe.getChar(null, addr);
-  }
-  public static short readAbsoluteShort(long addr) {
-    return unsafe.getShort(null, addr);
-  }
-  public static int readAbsoluteInt(long addr) {
-    return unsafe.getInt(null, addr);
-  }
-  public static int readAbsoluteIntVolatile(long addr) {
-    return unsafe.getIntVolatile(null, addr);
-  }
-  public static long readAbsoluteLong(long addr) {
-    return unsafe.getLong(null, addr);
-  }
-  public static long readAbsoluteLongVolatile(long addr) {
-    return unsafe.getLongVolatile(null, addr);
-  }
-
-  @Override
-  public byte readByte(int offset) {
-    return readAbsoluteByte(this.data+offset);
-  }
-
-  public static void writeAbsoluteByte(long addr, byte value) {
-    unsafe.putByte(addr, value);
-  }
-       
-  public static void writeAbsoluteInt(long addr, int value) {
-    unsafe.putInt(null, addr, value);
-  }
-  public static void writeAbsoluteIntVolatile(long addr, int value) {
-    unsafe.putIntVolatile(null, addr, value);
-  }
-  public static boolean writeAbsoluteIntVolatile(long addr, int expected, int value) {
-    return unsafe.compareAndSwapInt(null, addr, expected, value);
-  }
-  public static void writeAbsoluteLong(long addr, long value) {
-    unsafe.putLong(null, addr, value);
-  }
-  public static void writeAbsoluteLongVolatile(long addr, long value) {
-    unsafe.putLongVolatile(null, addr, value);
-  }
-  public static boolean writeAbsoluteLongVolatile(long addr, long expected, long value) {
-    return unsafe.compareAndSwapLong(null, addr, expected, value);
-  }
-
-  @Override
-  public void writeByte(int offset, byte value) {
-    writeAbsoluteByte(this.data+offset, value);
-  }
-
-  @Override
-  public void readBytes(int offset, byte[] bytes) {
-    readBytes(offset, bytes, 0, bytes.length);
-  }
-
-  @Override
-  public void writeBytes(int offset, byte[] bytes) {
-    writeBytes(offset, bytes, 0, bytes.length);
-  }
-
-  public static void readAbsoluteBytes(long addr, byte[] bytes, int bytesOffset, int size) {
-    // Throwing an Error instead of using the "assert" keyword because passing < 0 to
-    // copyMemory(...) can lead to a core dump with some JVMs and we don't want to
-    // require the -ea JVM flag.
-    if (size < 0) {
-      throw new AssertionError("Size=" + size + ", but size must be >= 0");
-    }
-    
-    assert bytesOffset >= 0 : "byteOffset=" + bytesOffset;
-    assert bytesOffset + size <= bytes.length : "byteOffset=" + bytesOffset + ",size=" + size + ",bytes.length=" + bytes.length;
-    
-    if (size == 0) {
-      return; // No point in wasting time copying 0 bytes
-    }
-    unsafe.copyMemory(null, addr, bytes, ARRAY_BYTE_BASE_OFFSET+bytesOffset, size);
-  }
-
-  @Override
-  public void readBytes(int offset, byte[] bytes, int bytesOffset, int size) {
-    readAbsoluteBytes(this.data+offset, bytes, bytesOffset, size);
-  }
-
-  public static void copyMemory(long srcAddr, long dstAddr, long size) {
-    unsafe.copyMemory(srcAddr, dstAddr, size);
-  }
-  
-  public static void writeAbsoluteBytes(long addr, byte[] bytes, int bytesOffset, int size) {
-    // Throwing an Error instead of using the "assert" keyword because passing < 0 to
-    // copyMemory(...) can lead to a core dump with some JVMs and we don't want to
-    // require the -ea JVM flag.
-    if (size < 0) {
-      throw new AssertionError("Size=" + size + ", but size must be >= 0");
-    }
-
-    assert bytesOffset >= 0 : "byteOffset=" + bytesOffset;
-    assert bytesOffset + size <= bytes.length : "byteOffset=" + bytesOffset + ",size=" + size + ",bytes.length=" + bytes.length;
-    
-    if (size == 0) {
-      return; // No point in wasting time copying 0 bytes
-    }
-    unsafe.copyMemory(bytes, ARRAY_BYTE_BASE_OFFSET+bytesOffset, null, addr, size);
-  }
-
-  public static void fill(long addr, int size, byte fill) {
-    unsafe.setMemory(addr, size, fill);
-  }
-  
-  @Override
-  public void writeBytes(int offset, byte[] bytes, int bytesOffset, int size) {
-    writeAbsoluteBytes(this.data+offset, bytes, bytesOffset, size);
-  }
-
-  @Override
-  public void release() {
-    unsafe.freeMemory(this.data);
-  }
-
-  @Override
-  public void copyBytes(int src, int dst, int size) {
-    unsafe.copyMemory(this.data+src, this.data+dst, size);
-  }
-  
-  @Override
-  public String toString() {
-    final StringBuilder sb = new StringBuilder(getClass().getSimpleName());
-    sb.append("{");
-    sb.append("MemoryAddress=").append(getMemoryAddress());
-    sb.append(", Size=").append(getSize());
-    sb.append("}");
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ByteBufferInputStream.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ByteBufferInputStream.java
index cfc05f2..d7ec947 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ByteBufferInputStream.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ByteBufferInputStream.java
@@ -31,8 +31,8 @@ import java.nio.ByteOrder;
 
 import com.gemstone.gemfire.internal.ByteBufferWriter;
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
-import com.gemstone.gemfire.internal.offheap.UnsafeMemoryChunk;
+import com.gemstone.gemfire.internal.offheap.AddressableMemoryManager;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
 
 /**
  * <p>
@@ -109,15 +109,15 @@ public class ByteBufferInputStream extends InputStream implements DataInput, jav
     public static ByteSource create(ByteBuffer bb) {
       return new ByteBufferByteSource(bb);
     }
-    public static ByteSource create(ObjectChunk chunk) {
-      // Since I found a way to create a DirectByteBuffer (using reflection) from a Chunk
+    public static ByteSource create(StoredObject so) {
+      // Since I found a way to create a DirectByteBuffer (using reflection) from a StoredObject
       // we might not even need the ByteSource abstraction any more.
       // But it is possible that createByteBuffer will not work on a different jdk so keep it for now.
-      ByteBuffer bb = chunk.createDirectByteBuffer();
+      ByteBuffer bb = so.createDirectByteBuffer();
       if (bb != null) {
         return create(bb);
       } else {
-        return new OffHeapByteSource(chunk);
+        return new OffHeapByteSource(so);
       }
     }
   }
@@ -323,10 +323,10 @@ public class ByteBufferInputStream extends InputStream implements DataInput, jav
   public static class OffHeapByteSource implements ByteSource {
     private int position;
     private int limit;
-    private final ObjectChunk chunk;
+    private final StoredObject chunk;
 
-    public OffHeapByteSource(ObjectChunk c) {
-      this.chunk = c;
+    public OffHeapByteSource(StoredObject so) {
+      this.chunk = so;
       this.position = 0;
       this.limit = capacity();
     }
@@ -474,17 +474,17 @@ public class ByteBufferInputStream extends InputStream implements DataInput, jav
       }
       int p = this.position;
       this.position += length;
-      this.chunk.readBytes(p, dst, offset, length);
+      this.chunk.readDataBytes(p, dst, offset, length);
     }
 
     @Override
     public byte get() {
-      return this.chunk.readByte(nextGetIndex());
+      return this.chunk.readDataByte(nextGetIndex());
     }
     @Override
     public byte get(int pos) {
       checkIndex(pos);
-      return this.chunk.readByte(pos);
+      return this.chunk.readDataByte(pos);
     }
 
     /**
@@ -513,16 +513,16 @@ public class ByteBufferInputStream extends InputStream implements DataInput, jav
       return basicGetShort(pos);
     }
     private short basicGetShort(int pos) {
-      long addr = this.chunk.getAddressForReading(pos, 2);
+      long addr = this.chunk.getAddressForReadingData(pos, 2);
       if (unaligned) {
-        short result = UnsafeMemoryChunk.readAbsoluteShort(addr);
+        short result = AddressableMemoryManager.readShort(addr);
         if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) {
           result = Short.reverseBytes(result);
         }
         return result;
       } else {
-        int ch1 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
-        int ch2 = UnsafeMemoryChunk.readAbsoluteByte(addr);
+        int ch1 = AddressableMemoryManager.readByte(addr++);
+        int ch2 = AddressableMemoryManager.readByte(addr);
         return (short)((ch1 << 8) + (ch2 << 0));
       }
     }
@@ -537,16 +537,16 @@ public class ByteBufferInputStream extends InputStream implements DataInput, jav
       return basicGetChar(pos);
     }
     private char basicGetChar(int pos) {
-      long addr = this.chunk.getAddressForReading(pos, 2);
+      long addr = this.chunk.getAddressForReadingData(pos, 2);
       if (unaligned) {
-        char result = UnsafeMemoryChunk.readAbsoluteChar(addr);
+        char result = AddressableMemoryManager.readChar(addr);
         if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) {
           result = Character.reverseBytes(result);
         }
         return result;
       } else {
-        int ch1 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
-        int ch2 = UnsafeMemoryChunk.readAbsoluteByte(addr);
+        int ch1 = AddressableMemoryManager.readByte(addr++);
+        int ch2 = AddressableMemoryManager.readByte(addr);
         return (char)((ch1 << 8) + (ch2 << 0));
       }
     }
@@ -562,18 +562,18 @@ public class ByteBufferInputStream extends InputStream implements DataInput, jav
     }
     
     private int basicGetInt(final int pos) {
-      long addr = this.chunk.getAddressForReading(pos, 4);
+      long addr = this.chunk.getAddressForReadingData(pos, 4);
       if (unaligned) {
-        int result = UnsafeMemoryChunk.readAbsoluteInt(addr);
+        int result = AddressableMemoryManager.readInt(addr);
         if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) {
           result = Integer.reverseBytes(result);
         }
         return result;
       } else {
-        byte b0 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
-        byte b1 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
-        byte b2 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
-        byte b3 = UnsafeMemoryChunk.readAbsoluteByte(addr);
+        byte b0 = AddressableMemoryManager.readByte(addr++);
+        byte b1 = AddressableMemoryManager.readByte(addr++);
+        byte b2 = AddressableMemoryManager.readByte(addr++);
+        byte b3 = AddressableMemoryManager.readByte(addr);
         return (b0 << 24) + ((b1 & 255) << 16) + ((b2 & 255) << 8) + ((b3 & 255) << 0);
       }
     }
@@ -588,22 +588,22 @@ public class ByteBufferInputStream extends InputStream implements DataInput, jav
       return basicGetLong(pos);
     }
     private long basicGetLong(final int pos) {
-      long addr = this.chunk.getAddressForReading(pos, 8);
+      long addr = this.chunk.getAddressForReadingData(pos, 8);
       if (unaligned) {
-        long result = UnsafeMemoryChunk.readAbsoluteLong(addr);
+        long result = AddressableMemoryManager.readLong(addr);
         if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) {
           result = Long.reverseBytes(result);
         }
         return result;
       } else {
-        byte b0 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
-        byte b1 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
-        byte b2 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
-        byte b3 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
-        byte b4 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
-        byte b5 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
-        byte b6 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
-        byte b7 = UnsafeMemoryChunk.readAbsoluteByte(addr);
+        byte b0 = AddressableMemoryManager.readByte(addr++);
+        byte b1 = AddressableMemoryManager.readByte(addr++);
+        byte b2 = AddressableMemoryManager.readByte(addr++);
+        byte b3 = AddressableMemoryManager.readByte(addr++);
+        byte b4 = AddressableMemoryManager.readByte(addr++);
+        byte b5 = AddressableMemoryManager.readByte(addr++);
+        byte b6 = AddressableMemoryManager.readByte(addr++);
+        byte b7 = AddressableMemoryManager.readByte(addr);
         return (((long)b0 << 56) +
             ((long)(b1 & 255) << 48) +
             ((long)(b2 & 255) << 40) +
@@ -724,7 +724,7 @@ public class ByteBufferInputStream extends InputStream implements DataInput, jav
     this.buffer = copy.buffer.duplicate();
   }
 
-  public ByteBufferInputStream(ObjectChunk blob) {
+  public ByteBufferInputStream(StoredObject blob) {
     this.buffer = ByteSourceFactory.create(blob);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ImmutableByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ImmutableByteBufferInputStream.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ImmutableByteBufferInputStream.java
index d632158..ff0871a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ImmutableByteBufferInputStream.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ImmutableByteBufferInputStream.java
@@ -18,7 +18,7 @@ package com.gemstone.gemfire.internal.tcp;
 
 import java.nio.ByteBuffer;
 
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
 
 /**
  * You should only create an instance of this class if the bytes this buffer reads
@@ -67,7 +67,7 @@ public class ImmutableByteBufferInputStream extends ByteBufferInputStream {
     // for serialization
   }
   
-  public ImmutableByteBufferInputStream(ObjectChunk blob) {
+  public ImmutableByteBufferInputStream(StoredObject blob) {
     super(blob);
   }
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/util/BlobHelper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/util/BlobHelper.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/util/BlobHelper.java
index 40015a4..28252c3 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/util/BlobHelper.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/util/BlobHelper.java
@@ -27,7 +27,7 @@ import com.gemstone.gemfire.internal.DSCODE;
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
 import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
 import com.gemstone.gemfire.pdx.internal.PdxInputStream;
 
@@ -140,7 +140,7 @@ public class BlobHelper {
    * If a PdxInstance is returned then it will refer to Chunk's off-heap memory
    * with an unretained reference.
    */
-  public static @Unretained Object deserializeOffHeapBlob(ObjectChunk blob) throws IOException, ClassNotFoundException {
+  public static @Unretained Object deserializeOffHeapBlob(StoredObject blob) throws IOException, ClassNotFoundException {
     Object result;
     final long start = startDeserialization();
     // For both top level and nested pdxs we just want a reference to this off-heap blob.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/pdx/internal/PdxInputStream.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/pdx/internal/PdxInputStream.java b/geode-core/src/main/java/com/gemstone/gemfire/pdx/internal/PdxInputStream.java
index 4a5a9df..85c6cd5 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/pdx/internal/PdxInputStream.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/pdx/internal/PdxInputStream.java
@@ -26,7 +26,7 @@ import java.util.Date;
 import com.gemstone.gemfire.DataSerializer;
 import com.gemstone.gemfire.InternalGemFireException;
 import com.gemstone.gemfire.pdx.PdxSerializationException;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
 import com.gemstone.gemfire.internal.tcp.ByteBufferInputStream;
 import com.gemstone.gemfire.internal.tcp.ImmutableByteBufferInputStream;
 
@@ -76,7 +76,7 @@ public class PdxInputStream extends ImmutableByteBufferInputStream {
     // for serialization
   }
 
-  public PdxInputStream(ObjectChunk blob) {
+  public PdxInputStream(StoredObject blob) {
     super(blob);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
index 7a4d09e..dce68cf 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
@@ -110,8 +110,8 @@ import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
 import com.gemstone.gemfire.internal.cache.versions.VMRegionVersionVector;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
 import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.offheap.MemoryChunkWithRefCount;
 import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
 import com.gemstone.gemfire.test.dunit.AsyncInvocation;
 import com.gemstone.gemfire.test.dunit.DistributedTestCase;
 import com.gemstone.gemfire.test.dunit.DistributedTestUtils;
@@ -2004,8 +2004,8 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
               LocalRegion reRegion;
               reRegion = (LocalRegion) region;
               RegionEntry re = reRegion.getRegionEntry(key2);
-              MemoryChunkWithRefCount mc = (MemoryChunkWithRefCount) re._getValue();
-              assertEquals(1, mc.getRefCount());
+              StoredObject so = (StoredObject) re._getValue();
+              assertEquals(1, so.getRefCount());
               assertEquals(1, ma.getStats().getObjects());
             }
           }
@@ -2091,8 +2091,8 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
             assertEquals(2, ma.getStats().getObjects());
             LocalRegion reRegion;
             reRegion = (LocalRegion) region;
-            MemoryChunkWithRefCount mc = (MemoryChunkWithRefCount) reRegion.getRegionEntry(key)._getValue();
-            assertEquals(1, mc.getRefCount());
+            StoredObject so = (StoredObject) reRegion.getRegionEntry(key)._getValue();
+            assertEquals(1, so.getRefCount());
           }
         }
       });
@@ -2157,8 +2157,8 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
               assertEquals(2, ma.getStats().getObjects());
               LocalRegion reRegion;
               reRegion = (LocalRegion) region;
-              MemoryChunkWithRefCount mc = (MemoryChunkWithRefCount) reRegion.getRegionEntry(key)._getValue();
-              assertEquals(1, mc.getRefCount());
+              StoredObject so = (StoredObject) reRegion.getRegionEntry(key)._getValue();
+              assertEquals(1, so.getRefCount());
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ChunkValueWrapperJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ChunkValueWrapperJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ChunkValueWrapperJUnitTest.java
deleted file mode 100644
index b69f82e..0000000
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ChunkValueWrapperJUnitTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.ChunkValueWrapper;
-import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.Flushable;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
-import com.gemstone.gemfire.internal.offheap.NullOffHeapMemoryStats;
-import com.gemstone.gemfire.internal.offheap.NullOutOfOffHeapMemoryListener;
-import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
-import com.gemstone.gemfire.internal.offheap.UnsafeMemoryChunk;
-import com.gemstone.gemfire.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class ChunkValueWrapperJUnitTest {
-
-  private static ChunkValueWrapper createChunkValueWrapper(byte[] bytes, boolean isSerialized) {
-    ObjectChunk c = (ObjectChunk)SimpleMemoryAllocatorImpl.getAllocator().allocateAndInitialize(bytes, isSerialized, false);
-    return new ChunkValueWrapper(c);
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    SimpleMemoryAllocatorImpl.freeOffHeapMemory();
-  }
-
-  @Test
-  public void testIsSerialized() {
-    assertEquals(true, createChunkValueWrapper(new byte[16], true).isSerialized());
-    assertEquals(false, createChunkValueWrapper(new byte[16], false).isSerialized());
-  }
-  
-  @Test
-  public void testGetUserBits() {
-    assertEquals((byte)1, createChunkValueWrapper(new byte[16], true).getUserBits());
-    assertEquals((byte)0, createChunkValueWrapper(new byte[16], false).getUserBits());
-  }
-  
-  @Test
-  public void testGetLength() {
-    assertEquals(32, createChunkValueWrapper(new byte[32], true).getLength());
-    assertEquals(17, createChunkValueWrapper(new byte[17], false).getLength());
-  }
-  
-  @Test
-  public void testGetBytesAsString() {
-    assertEquals("byte[0, 0, 0, 0, 0, 0, 0, 0]", createChunkValueWrapper(new byte[8], false).getBytesAsString());
-  }
-  
-  @Test
-  public void testSendTo() throws IOException {
-    final ByteBuffer bb = ByteBuffer.allocateDirect(18);
-    bb.limit(8);
-    ChunkValueWrapper vw = createChunkValueWrapper(new byte[]{1,2,3,4,5,6,7,8}, false);
-    vw.sendTo(bb, new Flushable() {
-      @Override
-      public void flush() throws IOException {
-        fail("should not have been called");
-      }
-
-      @Override
-      public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException {
-        fail("should not have been called");
-      }
-    });
-    assertEquals(8, bb.position());
-    bb.flip();
-    assertEquals(1, bb.get());
-    assertEquals(2, bb.get());
-    assertEquals(3, bb.get());
-    assertEquals(4, bb.get());
-    assertEquals(5, bb.get());
-    assertEquals(6, bb.get());
-    assertEquals(7, bb.get());
-    assertEquals(8, bb.get());
-    
-    bb.clear();
-    bb.limit(8);
-    vw = createChunkValueWrapper(new byte[]{1,2,3,4,5,6,7,8,9}, false);
-    final int[] flushCalls = new int[1];
-    vw.sendTo(bb, new Flushable() {
-      @Override
-      public void flush() throws IOException {
-        if (flushCalls[0] != 0) {
-          fail("expected flush to only be called once");
-        }
-        flushCalls[0]++;
-        assertEquals(8, bb.position());
-        for (int i=0; i < 8; i++) {
-          assertEquals(i+1, bb.get(i));
-        }
-        bb.clear();
-        bb.limit(8);
-      }
-
-      @Override
-      public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException {
-        fail("should not have been called");
-      }
-    });
-    assertEquals(1, bb.position());
-    bb.flip();
-    assertEquals(9, bb.get());
-    
-    bb.clear();
-    bb.limit(8);
-    flushCalls[0] = 0;
-    vw = createChunkValueWrapper(new byte[]{1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17}, false);
-    vw.sendTo(bb, new Flushable() {
-      @Override
-      public void flush() throws IOException {
-        if (flushCalls[0] > 1) {
-          fail("expected flush to only be called twice");
-        }
-        assertEquals(8, bb.position());
-        for (int i=0; i < 8; i++) {
-          assertEquals((flushCalls[0]*8)+i+1, bb.get(i));
-        }
-        flushCalls[0]++;
-        bb.clear();
-        bb.limit(8);
-      }
-
-      @Override
-      public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException {
-        fail("should not have been called");
-      }
-    });
-    assertEquals(1, bb.position());
-    bb.flip();
-    assertEquals(17, bb.get());
-    
-    // now test with a chunk that will not fit in bb.
-    bb.clear();
-    flushCalls[0] = 0;
-    bb.put((byte) 0);
-    vw = createChunkValueWrapper(new byte[]{1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19}, false);
-    vw.sendTo(bb, new Flushable() {
-      @Override
-      public void flush() throws IOException {
-        fail("should not have been called");
-      }
-
-      @Override
-      public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException {
-        flushCalls[0]++;
-        assertEquals(1, bb.position());
-        bb.flip();
-        assertEquals(0, bb.get());
-        assertEquals(19, chunkbb.remaining());
-        for (int i=1; i <= 19; i++) {
-          assertEquals(i, chunkbb.get());
-        }
-      }
-    });
-    assertEquals(1, flushCalls[0]);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapTestUtil.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapTestUtil.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapTestUtil.java
index cbf3bf6..8fd6895 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapTestUtil.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapTestUtil.java
@@ -52,7 +52,7 @@ public class OffHeapTestUtil {
     }
     
     if(orphans != null && ! orphans.isEmpty()) {
-      List<RefCountChangeInfo> info = ReferenceCountHelper.getRefCountInfo(orphans.get(0).getMemoryAddress());
+      List<RefCountChangeInfo> info = ReferenceCountHelper.getRefCountInfo(orphans.get(0).getAddress());
       System.out.println("FOUND ORPHAN!!");
       System.out.println("Sample orphan: " + orphans.get(0));
       System.out.println("Orphan info: " + info);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapValueWrapperJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapValueWrapperJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapValueWrapperJUnitTest.java
new file mode 100644
index 0000000..0829009
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapValueWrapperJUnitTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.OffHeapValueWrapper;
+import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.Flushable;
+import com.gemstone.gemfire.internal.offheap.NullOffHeapMemoryStats;
+import com.gemstone.gemfire.internal.offheap.NullOutOfOffHeapMemoryListener;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
+import com.gemstone.gemfire.internal.offheap.SlabImpl;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class OffHeapValueWrapperJUnitTest {
+
+  private static OffHeapValueWrapper createChunkValueWrapper(byte[] bytes, boolean isSerialized) {
+    StoredObject c = SimpleMemoryAllocatorImpl.getAllocator().allocateAndInitialize(bytes, isSerialized, false);
+    return new OffHeapValueWrapper(c);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new SlabImpl[]{new SlabImpl(1024*1024)});
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+  }
+
+  @Test
+  public void testIsSerialized() {
+    assertEquals(true, createChunkValueWrapper(new byte[16], true).isSerialized());
+    assertEquals(false, createChunkValueWrapper(new byte[16], false).isSerialized());
+  }
+  
+  @Test
+  public void testGetUserBits() {
+    assertEquals((byte)1, createChunkValueWrapper(new byte[16], true).getUserBits());
+    assertEquals((byte)0, createChunkValueWrapper(new byte[16], false).getUserBits());
+  }
+  
+  @Test
+  public void testGetLength() {
+    assertEquals(32, createChunkValueWrapper(new byte[32], true).getLength());
+    assertEquals(17, createChunkValueWrapper(new byte[17], false).getLength());
+  }
+  
+  @Test
+  public void testGetBytesAsString() {
+    assertEquals("byte[0, 0, 0, 0, 0, 0, 0, 0]", createChunkValueWrapper(new byte[8], false).getBytesAsString());
+  }
+  
+  @Test
+  public void testSendTo() throws IOException {
+    final ByteBuffer bb = ByteBuffer.allocateDirect(18);
+    bb.limit(8);
+    OffHeapValueWrapper vw = createChunkValueWrapper(new byte[]{1,2,3,4,5,6,7,8}, false);
+    vw.sendTo(bb, new Flushable() {
+      @Override
+      public void flush() throws IOException {
+        fail("should not have been called");
+      }
+
+      @Override
+      public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException {
+        fail("should not have been called");
+      }
+    });
+    assertEquals(8, bb.position());
+    bb.flip();
+    assertEquals(1, bb.get());
+    assertEquals(2, bb.get());
+    assertEquals(3, bb.get());
+    assertEquals(4, bb.get());
+    assertEquals(5, bb.get());
+    assertEquals(6, bb.get());
+    assertEquals(7, bb.get());
+    assertEquals(8, bb.get());
+    
+    bb.clear();
+    bb.limit(8);
+    vw = createChunkValueWrapper(new byte[]{1,2,3,4,5,6,7,8,9}, false);
+    final int[] flushCalls = new int[1];
+    vw.sendTo(bb, new Flushable() {
+      @Override
+      public void flush() throws IOException {
+        if (flushCalls[0] != 0) {
+          fail("expected flush to only be called once");
+        }
+        flushCalls[0]++;
+        assertEquals(8, bb.position());
+        for (int i=0; i < 8; i++) {
+          assertEquals(i+1, bb.get(i));
+        }
+        bb.clear();
+        bb.limit(8);
+      }
+
+      @Override
+      public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException {
+        fail("should not have been called");
+      }
+    });
+    assertEquals(1, bb.position());
+    bb.flip();
+    assertEquals(9, bb.get());
+    
+    bb.clear();
+    bb.limit(8);
+    flushCalls[0] = 0;
+    vw = createChunkValueWrapper(new byte[]{1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17}, false);
+    vw.sendTo(bb, new Flushable() {
+      @Override
+      public void flush() throws IOException {
+        if (flushCalls[0] > 1) {
+          fail("expected flush to only be called twice");
+        }
+        assertEquals(8, bb.position());
+        for (int i=0; i < 8; i++) {
+          assertEquals((flushCalls[0]*8)+i+1, bb.get(i));
+        }
+        flushCalls[0]++;
+        bb.clear();
+        bb.limit(8);
+      }
+
+      @Override
+      public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException {
+        fail("should not have been called");
+      }
+    });
+    assertEquals(1, bb.position());
+    bb.flip();
+    assertEquals(17, bb.get());
+    
+    // now test with a chunk that will not fit in bb.
+    bb.clear();
+    flushCalls[0] = 0;
+    bb.put((byte) 0);
+    vw = createChunkValueWrapper(new byte[]{1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19}, false);
+    vw.sendTo(bb, new Flushable() {
+      @Override
+      public void flush() throws IOException {
+        fail("should not have been called");
+      }
+
+      @Override
+      public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException {
+        flushCalls[0]++;
+        assertEquals(1, bb.position());
+        bb.flip();
+        assertEquals(0, bb.get());
+        assertEquals(19, chunkbb.remaining());
+        for (int i=1; i <= 19; i++) {
+          assertEquals(i, chunkbb.get());
+        }
+      }
+    });
+    assertEquals(1, flushCalls[0]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OldValueImporterTestBase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OldValueImporterTestBase.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OldValueImporterTestBase.java
index 690b55a..84d7fc7 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OldValueImporterTestBase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OldValueImporterTestBase.java
@@ -26,12 +26,12 @@ import org.junit.Test;
 
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
 import com.gemstone.gemfire.internal.cache.EntryEventImpl.OldValueImporter;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
-import com.gemstone.gemfire.internal.offheap.DataAsAddress;
+import com.gemstone.gemfire.internal.offheap.OffHeapStoredObject;
+import com.gemstone.gemfire.internal.offheap.TinyStoredObject;
 import com.gemstone.gemfire.internal.offheap.NullOffHeapMemoryStats;
 import com.gemstone.gemfire.internal.offheap.NullOutOfOffHeapMemoryListener;
 import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
-import com.gemstone.gemfire.internal.offheap.UnsafeMemoryChunk;
+import com.gemstone.gemfire.internal.offheap.SlabImpl;
 import com.gemstone.gemfire.internal.util.BlobHelper;
 
 public abstract class OldValueImporterTestBase {
@@ -110,10 +110,10 @@ public abstract class OldValueImporterTestBase {
     // off-heap DataAsAddress byte array
     {
       SimpleMemoryAllocatorImpl sma =
-          SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
+          SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new SlabImpl[]{new SlabImpl(1024*1024)});
       try {
         byte[] baValue = new byte[] {1,2};
-        DataAsAddress baValueSO = (DataAsAddress) sma.allocateAndInitialize(baValue, false, false);
+        TinyStoredObject baValueSO = (TinyStoredObject) sma.allocateAndInitialize(baValue, false, false);
         OldValueImporter omsg = createImporter();
         omsg.importOldObject(baValueSO, false);
         hdos = new HeapDataOutputStream(bytes);
@@ -127,10 +127,10 @@ public abstract class OldValueImporterTestBase {
     // off-heap Chunk byte array
     {
       SimpleMemoryAllocatorImpl sma =
-          SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
+          SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new SlabImpl[]{new SlabImpl(1024*1024)});
       try {
         byte[] baValue = new byte[] {1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17};
-        ObjectChunk baValueSO = (ObjectChunk) sma.allocateAndInitialize(baValue, false, false);
+        OffHeapStoredObject baValueSO = (OffHeapStoredObject) sma.allocateAndInitialize(baValue, false, false);
         OldValueImporter omsg = createImporter();
         omsg.importOldObject(baValueSO, false);
         hdos = new HeapDataOutputStream(bytes);
@@ -144,11 +144,11 @@ public abstract class OldValueImporterTestBase {
     // off-heap DataAsAddress String
     {
       SimpleMemoryAllocatorImpl sma =
-          SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
+          SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new SlabImpl[]{new SlabImpl(1024*1024)});
       try {
         String baValue = "12";
         byte[] baValueBlob = BlobHelper.serializeToBlob(baValue);
-        DataAsAddress baValueSO = (DataAsAddress) sma.allocateAndInitialize(baValueBlob, true, false);
+        TinyStoredObject baValueSO = (TinyStoredObject) sma.allocateAndInitialize(baValueBlob, true, false);
         OldValueImporter omsg = createImporter();
         omsg.importOldObject(baValueSO, true);
         hdos = new HeapDataOutputStream(bytes);
@@ -162,11 +162,11 @@ public abstract class OldValueImporterTestBase {
     // off-heap Chunk String
     {
       SimpleMemoryAllocatorImpl sma =
-          SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
+          SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new SlabImpl[]{new SlabImpl(1024*1024)});
       try {
         String baValue = "12345678";
         byte[] baValueBlob = BlobHelper.serializeToBlob(baValue);
-        ObjectChunk baValueSO = (ObjectChunk) sma.allocateAndInitialize(baValueBlob, true, false);
+        OffHeapStoredObject baValueSO = (OffHeapStoredObject) sma.allocateAndInitialize(baValueBlob, true, false);
         OldValueImporter omsg = createImporter();
         omsg.importOldObject(baValueSO, true);
         hdos = new HeapDataOutputStream(bytes);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java
index b7bd47a..8caf3f6 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java
@@ -28,7 +28,6 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import com.gemstone.gemfire.internal.Version;
-import com.gemstone.gemfire.internal.offheap.HeapByteBufferMemoryChunkJUnitTest;
 import com.gemstone.gemfire.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/ByteArrayMemoryChunkJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/ByteArrayMemoryChunkJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/ByteArrayMemoryChunkJUnitTest.java
deleted file mode 100644
index e9972a5..0000000
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/ByteArrayMemoryChunkJUnitTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.offheap;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class ByteArrayMemoryChunkJUnitTest extends MemoryChunkJUnitTestBase {
-  @Override
-  protected MemoryChunk createChunk(int size) {
-    return new ByteArrayMemoryChunk(size);
-  }
-
-}