You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/03/15 01:23:55 UTC

[12/44] incubator-geode git commit: GEODE-982: refactor off-heap

GEODE-982: refactor off-heap

- removed unused MemoryChunk implementations
- collapsed OffHeapCacheDeserializable into StoredObject
- Added Slab, SlabFactory, SlabImpl, and AddressableMemoryManager.
- collapsed MemoryChunkWithRefCount into StoredObject
- methods that access and modify the data now have Data in their name
- collapsed AddressableStoredObject into StoredObject
- changed product code to use the StoredObject interface
  instead of internal class implementations of it
- renamed DataAsAddress to TinyStoredObject
- renamed ObjectChunk to OffHeapStoredObject
- renamed ObjectChunkWithHeapForm to OffHeapStoredObjectWithHeapForm
- renamed allocateChunk to allocateOffHeapStoredObject
- renamed FakeChunk to SearchMarker
- renamed ObjectChunkSlice to OffHeapStoredObjectSlice
- renamed SyncChunkStack to OffHeapStoredObjectAddressStack
- renamed ChunkValueWrapper to OffHeapValueWrapper


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/3087c86f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/3087c86f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/3087c86f

Branch: refs/heads/feature/GEODE-949-2
Commit: 3087c86f729785ad9fa021f4437d25ca5ef9231d
Parents: 8d7a00e
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Mar 8 15:44:03 2016 -0800
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Mar 8 15:44:03 2016 -0800

----------------------------------------------------------------------
 .../gemfire/cache/client/internal/PutAllOp.java |   8 +-
 .../gemfire/cache/client/internal/PutOp.java    |  15 +-
 .../internal/GetOperationContextImpl.java       |  24 +-
 .../query/internal/index/AbstractIndex.java     |   8 +-
 .../query/internal/index/DummyQRegion.java      |  14 +-
 .../cache/query/internal/index/HashIndex.java   |   6 +-
 .../internal/cache/AbstractRegionEntry.java     |  85 +-
 .../internal/cache/AbstractRegionMap.java       |  81 +-
 .../gemfire/internal/cache/BucketRegion.java    |   5 +-
 .../cache/BytesAndBitsForCompactor.java         |  18 +-
 .../internal/cache/CachedDeserializable.java    |   8 +
 .../gemfire/internal/cache/DiskEntry.java       | 113 +--
 .../gemfire/internal/cache/DiskStoreImpl.java   |   4 -
 .../internal/cache/DistributedRegion.java       |  10 -
 .../gemfire/internal/cache/EntryEventImpl.java  | 161 ++--
 .../gemfire/internal/cache/LocalRegion.java     |  11 +-
 .../gemstone/gemfire/internal/cache/Oplog.java  |  12 +-
 .../internal/cache/PartitionedRegion.java       |   1 -
 .../cache/PreferBytesCachedDeserializable.java  |  11 +-
 .../gemfire/internal/cache/RegionEntry.java     |   8 +-
 .../internal/cache/RemoteDestroyMessage.java    |   4 +-
 .../cache/SearchLoadAndWriteProcessor.java      |   8 +-
 .../cache/StoreAllCachedDeserializable.java     |  11 +-
 .../internal/cache/VMCachedDeserializable.java  |   9 +-
 .../SnappyCompressedCachedDeserializable.java   |  10 +
 .../internal/cache/partitioned/PutMessage.java  |   8 +-
 .../internal/cache/tier/sockets/Part.java       |  51 +-
 .../cache/tier/sockets/command/Get70.java       |  19 +-
 .../cache/tier/sockets/command/Request.java     |   8 +-
 .../cache/wan/GatewaySenderEventImpl.java       |  12 +-
 .../internal/offheap/AbstractStoredObject.java  |  24 +
 .../offheap/AddressableMemoryChunk.java         |  29 -
 .../offheap/AddressableMemoryChunkFactory.java  |  27 -
 .../offheap/AddressableMemoryManager.java       | 261 ++++++
 .../internal/offheap/ByteArrayMemoryChunk.java  |  77 --
 .../internal/offheap/ByteBufferMemoryChunk.java |  90 --
 .../gemfire/internal/offheap/DataAsAddress.java | 131 ---
 .../gemfire/internal/offheap/Fragment.java      |  10 +-
 .../internal/offheap/FreeListManager.java       | 180 ++--
 .../internal/offheap/MemoryAllocator.java       |  18 +-
 .../gemfire/internal/offheap/MemoryBlock.java   |   2 +-
 .../internal/offheap/MemoryBlockNode.java       |  22 +-
 .../gemfire/internal/offheap/MemoryChunk.java   |  47 -
 .../offheap/MemoryChunkWithRefCount.java        |  34 -
 .../gemfire/internal/offheap/ObjectChunk.java   | 737 ---------------
 .../internal/offheap/ObjectChunkSlice.java      |  44 -
 .../offheap/ObjectChunkWithHeapForm.java        |  40 -
 .../offheap/OffHeapCachedDeserializable.java    | 142 ---
 .../gemfire/internal/offheap/OffHeapHelper.java |  24 +-
 .../offheap/OffHeapRegionEntryHelper.java       |  28 +-
 .../internal/offheap/OffHeapStoredObject.java   | 718 +++++++++++++++
 .../OffHeapStoredObjectAddressStack.java        | 141 +++
 .../offheap/OffHeapStoredObjectSlice.java       |  44 +
 .../OffHeapStoredObjectWithHeapForm.java        |  41 +
 .../offheap/SimpleMemoryAllocatorImpl.java      |  77 +-
 .../gemstone/gemfire/internal/offheap/Slab.java |  39 +
 .../gemfire/internal/offheap/SlabFactory.java   |  27 +
 .../gemfire/internal/offheap/SlabImpl.java      |  61 ++
 .../gemfire/internal/offheap/StoredObject.java  | 117 ++-
 .../internal/offheap/SyncChunkStack.java        | 141 ---
 .../internal/offheap/TinyStoredObject.java      | 229 +++++
 .../internal/offheap/UnsafeMemoryChunk.java     | 217 -----
 .../internal/tcp/ByteBufferInputStream.java     |  74 +-
 .../tcp/ImmutableByteBufferInputStream.java     |   4 +-
 .../gemfire/internal/util/BlobHelper.java       |   4 +-
 .../gemfire/pdx/internal/PdxInputStream.java    |   4 +-
 .../gemfire/cache30/MultiVMRegionTestCase.java  |  14 +-
 .../cache/ChunkValueWrapperJUnitTest.java       | 188 ----
 .../gemfire/internal/cache/OffHeapTestUtil.java |   2 +-
 .../cache/OffHeapValueWrapperJUnitTest.java     | 188 ++++
 .../cache/OldValueImporterTestBase.java         |  22 +-
 .../cache/tier/sockets/MessageJUnitTest.java    |   1 -
 .../offheap/ByteArrayMemoryChunkJUnitTest.java  |  30 -
 .../offheap/DataAsAddressJUnitTest.java         | 368 --------
 .../DirectByteBufferMemoryChunkJUnitTest.java   |  33 -
 .../internal/offheap/FragmentJUnitTest.java     |  22 +-
 .../internal/offheap/FreeListManagerTest.java   | 322 +++----
 .../offheap/FreeListOffHeapRegionJUnitTest.java |   2 +-
 .../HeapByteBufferMemoryChunkJUnitTest.java     |  33 -
 .../offheap/LifecycleListenerJUnitTest.java     |  24 +-
 .../offheap/MemoryBlockNodeJUnitTest.java       |  48 +-
 .../offheap/MemoryChunkJUnitTestBase.java       | 290 ------
 .../internal/offheap/MemoryChunkTestSuite.java  |  32 -
 .../internal/offheap/ObjectChunkJUnitTest.java  | 902 -------------------
 .../offheap/ObjectChunkSliceJUnitTest.java      |  72 --
 .../ObjectChunkWithHeapFormJUnitTest.java       |  64 --
 .../offheap/OffHeapHelperJUnitTest.java         |  14 +-
 .../internal/offheap/OffHeapRegionBase.java     |  16 +-
 .../OffHeapRegionEntryHelperJUnitTest.java      |  88 +-
 ...ffHeapStoredObjectAddressStackJUnitTest.java | 289 ++++++
 .../offheap/OffHeapStoredObjectJUnitTest.java   | 869 ++++++++++++++++++
 .../OffHeapStoredObjectSliceJUnitTest.java      |  72 ++
 ...ffHeapStoredObjectWithHeapFormJUnitTest.java |  64 ++
 .../offheap/OffHeapValidationJUnitTest.java     |   8 +-
 .../OffHeapWriteObjectAsByteArrayJUnitTest.java |  12 +-
 .../OldFreeListOffHeapRegionJUnitTest.java      |   2 +-
 ...moryAllocatorFillPatternIntegrationTest.java |  16 +-
 ...mpleMemoryAllocatorFillPatternJUnitTest.java |  24 +-
 .../offheap/SimpleMemoryAllocatorJUnitTest.java | 114 +--
 .../internal/offheap/StoredObjectTestSuite.java |   8 +-
 .../offheap/SyncChunkStackJUnitTest.java        | 289 ------
 .../offheap/TinyMemoryBlockJUnitTest.java       |  64 +-
 .../offheap/TinyStoredObjectJUnitTest.java      | 353 ++++++++
 .../offheap/UnsafeMemoryChunkJUnitTest.java     |  87 --
 .../management/OffHeapManagementDUnitTest.java  |   8 +-
 .../OffHeapByteBufferByteSourceJUnitTest.java   |   6 +-
 .../gemfire/pdx/OffHeapByteSourceJUnitTest.java |  10 +-
 .../sanctionedDataSerializables.txt             |   2 +-
 108 files changed, 4503 insertions(+), 5217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutAllOp.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutAllOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutAllOp.java
index e81cd0c..2f06589 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutAllOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutAllOp.java
@@ -45,7 +45,6 @@ import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
 import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
 import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.offheap.StoredObject;
 
 /**
  * Does a region putAll on a server
@@ -307,11 +306,12 @@ public class PutAllOp {
         getMessage().addStringOrObjPart(key);
         Object value = mapEntry.getValue();
         if (value instanceof CachedDeserializable) {
-          if (value instanceof StoredObject && !((StoredObject) value).isSerialized()) {
+          CachedDeserializable cd = (CachedDeserializable) value;
+          if (!cd.isSerialized()) {
             // it is a byte[]
-            getMessage().addObjPart(((StoredObject) value).getDeserializedForReading());
+            getMessage().addObjPart(cd.getDeserializedForReading());
           } else {
-            Object cdValue = ((CachedDeserializable)value).getValue();
+            Object cdValue = cd.getValue();
             if (cdValue instanceof byte[]) {
               getMessage().addRawPart((byte[])cdValue, true);
             } else {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutOp.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutOp.java
index ef4c9d3..c0273c5 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutOp.java
@@ -34,7 +34,6 @@ import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
 import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.offheap.StoredObject;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
@@ -221,13 +220,14 @@ public class PutOp {
           }
         }
         else if (value instanceof CachedDeserializable) {
-          if (value instanceof StoredObject && !((StoredObject) value).isSerialized()) {
+          CachedDeserializable cd = (CachedDeserializable) value;
+          if (!cd.isSerialized()) {
             // it is a byte[]
             getMessage().addObjPart(Boolean.FALSE);
-            getMessage().addObjPart(((StoredObject) value).getDeserializedForReading());
+            getMessage().addObjPart(cd.getDeserializedForReading());
           } else {
             getMessage().addObjPart(Boolean.FALSE);
-            Object cdValue = ((CachedDeserializable)value).getValue();
+            Object cdValue = cd.getValue();
             if (cdValue instanceof byte[]) {
               getMessage().addRawPart((byte[])cdValue, true);
             }
@@ -283,13 +283,14 @@ public class PutOp {
         }
       }
       else if (value instanceof CachedDeserializable) {
-        if (value instanceof StoredObject && !((StoredObject) value).isSerialized()) {
+        CachedDeserializable cd = (CachedDeserializable) value;
+        if (!cd.isSerialized()) {
           // it is a byte[]
           getMessage().addObjPart(Boolean.FALSE);
-          getMessage().addObjPart(((StoredObject) value).getDeserializedForReading());
+          getMessage().addObjPart(cd.getDeserializedForReading());
         } else {
           getMessage().addObjPart(Boolean.FALSE);
-          Object cdValue = ((CachedDeserializable)value).getValue();
+          Object cdValue = cd.getValue();
           if (cdValue instanceof byte[]) {
             getMessage().addRawPart((byte[])cdValue, true);
           }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/cache/operations/internal/GetOperationContextImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/operations/internal/GetOperationContextImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/operations/internal/GetOperationContextImpl.java
index 11d9248..3257051 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/operations/internal/GetOperationContextImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/operations/internal/GetOperationContextImpl.java
@@ -18,7 +18,6 @@ package com.gemstone.gemfire.cache.operations.internal;
 
 import com.gemstone.gemfire.SerializationException;
 import com.gemstone.gemfire.cache.operations.GetOperationContext;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
 import com.gemstone.gemfire.internal.offheap.Releasable;
 import com.gemstone.gemfire.internal.offheap.StoredObject;
 import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
@@ -68,9 +67,9 @@ public class GetOperationContextImpl extends GetOperationContext implements Rele
     super.setValue(value, isObject);
   }
 
-  private void checkForReleasedOffHeapValue(Object v) {
-    // Note that we only care about Chunk (instead of all StoredObject) because it is the only one using a refcount
-    if (this.released && v instanceof ObjectChunk) {
+  private void checkForReleasedOffHeapValue(StoredObject so) {
+    // Note that we only care about stored objects with a ref count
+    if (this.released && so.hasRefCount()) {
       throw new IllegalStateException("Attempt to access off-heap value after the OperationContext callback returned.");
     }
   }
@@ -81,8 +80,9 @@ public class GetOperationContextImpl extends GetOperationContext implements Rele
     if (result == null) {
       Object v = super.getValue();
       if (v instanceof StoredObject) {
-        checkForReleasedOffHeapValue(v);
-        result = ((StoredObject) v).getValueAsHeapByteArray();
+        StoredObject so = (StoredObject) v;
+        checkForReleasedOffHeapValue(so);
+        result = so.getValueAsHeapByteArray();
       }
     }
     return result;
@@ -92,8 +92,9 @@ public class GetOperationContextImpl extends GetOperationContext implements Rele
   public Object getDeserializedValue() throws SerializationException {
     Object result = super.getDeserializedValue();
     if (result instanceof StoredObject) {
-      checkForReleasedOffHeapValue(result);
-      result = ((StoredObject) result).getValueAsDeserializedHeapObject();
+      StoredObject so = (StoredObject) result;
+      checkForReleasedOffHeapValue(so);
+      result = so.getValueAsDeserializedHeapObject();
     }
     return result;
   }
@@ -102,9 +103,10 @@ public class GetOperationContextImpl extends GetOperationContext implements Rele
   public Object getValue() {
     Object result = super.getValue();
     if (result instanceof StoredObject) {
-      checkForReleasedOffHeapValue(result);
+      StoredObject so = (StoredObject) result;
+      checkForReleasedOffHeapValue(so);
       // since they called getValue they don't care if it is serialized or deserialized so return it as serialized
-      result = ((StoredObject) result).getValueAsHeapByteArray();
+      result = so.getValueAsHeapByteArray();
     }
     return result;
   }
@@ -116,7 +118,7 @@ public class GetOperationContextImpl extends GetOperationContext implements Rele
     // our value (since this context did not retain it)
     // but we do make sure that any future attempt to access
     // the off-heap value fails.
-    if (super.getValue() instanceof ObjectChunk) {
+    if (super.getValue() instanceof StoredObject) {
       this.released = true;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/AbstractIndex.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/AbstractIndex.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/AbstractIndex.java
index aab99cb..0923327 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/AbstractIndex.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/AbstractIndex.java
@@ -75,7 +75,7 @@ import com.gemstone.gemfire.internal.cache.RegionEntry;
 import com.gemstone.gemfire.internal.cache.persistence.query.CloseableIterator;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
 import com.gemstone.gemfire.internal.offheap.annotations.Released;
 import com.gemstone.gemfire.internal.offheap.annotations.Retained;
 import com.gemstone.gemfire.pdx.PdxInstance;
@@ -1477,9 +1477,9 @@ public abstract class AbstractIndex implements IndexProtocol
         valueInIndex = verifyAndGetPdxDomainObject(value);
       } else{
         @Released Object val = re.getValueInVM(context.getPartitionedRegion());
-        ObjectChunk valToFree = null;
-        if (val instanceof ObjectChunk) {
-          valToFree = (ObjectChunk)val;
+        StoredObject valToFree = null;
+        if (val instanceof StoredObject) {
+          valToFree = (StoredObject)val;
         }
         try {
         if (val instanceof CachedDeserializable) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/DummyQRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/DummyQRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/DummyQRegion.java
index bde948d..b888f99 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/DummyQRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/DummyQRegion.java
@@ -41,7 +41,7 @@ import com.gemstone.gemfire.internal.cache.LocalRegion;
 import com.gemstone.gemfire.internal.cache.RegionEntry;
 import com.gemstone.gemfire.internal.cache.RegionEntryContext;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
 import com.gemstone.gemfire.internal.offheap.annotations.Released;
 import com.gemstone.gemfire.internal.offheap.annotations.Retained;
 
@@ -134,8 +134,8 @@ public class DummyQRegion extends QRegion {
     }
     valueInList.clear();
     Object val = this.entry.getValueOffHeapOrDiskWithoutFaultIn((LocalRegion) getRegion());
-    if (val instanceof ObjectChunk) {
-      @Retained @Released ObjectChunk ohval = (ObjectChunk) val;
+    if (val instanceof StoredObject) {
+      @Retained @Released StoredObject ohval = (StoredObject) val;
       try {
         // TODO OFFHEAP: val may be off-heap PdxInstance
         val = ohval.getDeserializedValue(getRegion(), this.entry);
@@ -155,8 +155,8 @@ public class DummyQRegion extends QRegion {
       valueInArray = new  Object[1];      
     }   
     Object val = this.entry.getValueOffHeapOrDiskWithoutFaultIn((LocalRegion) getRegion());
-    if (val instanceof ObjectChunk) {      
-      @Retained @Released ObjectChunk ohval = (ObjectChunk) val;
+    if (val instanceof StoredObject) {      
+      @Retained @Released StoredObject ohval = (StoredObject) val;
       try {
         // TODO OFFHEAP: val may be off-heap PdxInstance
         val = ohval.getDeserializedValue(getRegion(), this.entry);
@@ -178,8 +178,8 @@ public class DummyQRegion extends QRegion {
     }
     values.clear();
     Object val = this.entry.getValueOffHeapOrDiskWithoutFaultIn((LocalRegion) getRegion());
-    if (val instanceof ObjectChunk) {
-      @Retained @Released ObjectChunk ohval = (ObjectChunk) val;
+    if (val instanceof StoredObject) {
+      @Retained @Released StoredObject ohval = (StoredObject) val;
       try {
         // TODO OFFHEAP: val may be off-heap PdxInstance
         val = ohval.getDeserializedValue(getRegion(), this.entry);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/HashIndex.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/HashIndex.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/HashIndex.java
index a30a264..f8a1b8d 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/HashIndex.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/HashIndex.java
@@ -79,7 +79,7 @@ import com.gemstone.gemfire.internal.cache.Token;
 import com.gemstone.gemfire.internal.cache.persistence.query.CloseableIterator;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
 import com.gemstone.gemfire.internal.offheap.annotations.Released;
 import com.gemstone.gemfire.internal.offheap.annotations.Retained;
 import com.gemstone.gemfire.pdx.internal.PdxString;
@@ -876,8 +876,8 @@ public class HashIndex extends AbstractIndex {
     if (this.indexOnValues) {
       Object o = entry.getValueOffHeapOrDiskWithoutFaultIn((LocalRegion) getRegion());
       try {
-        if (o instanceof ObjectChunk) {
-          ObjectChunk ohval = (ObjectChunk) o;
+        if (o instanceof StoredObject) {
+          StoredObject ohval = (StoredObject) o;
           try {
             o = ohval.getDeserializedForReading();
           } finally {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
index 558ea37..a103e96 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
@@ -61,11 +61,7 @@ import com.gemstone.gemfire.internal.lang.StringUtils;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
-import com.gemstone.gemfire.internal.offheap.ObjectChunkWithHeapForm;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
 import com.gemstone.gemfire.internal.offheap.MemoryAllocator;
-import com.gemstone.gemfire.internal.offheap.OffHeapCachedDeserializable;
 import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
 import com.gemstone.gemfire.internal.offheap.ReferenceCountHelper;
 import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
@@ -334,14 +330,14 @@ public abstract class AbstractRegionEntry implements RegionEntry,
 //        // For SQLFire we prefer eager deserialized
 //        dst.setEagerDeserialize();         
 //      }
-      
-      if (v instanceof StoredObject && !((StoredObject) v).isSerialized()) {
-        dst.value = ((StoredObject) v).getDeserializedForReading();
+      CachedDeserializable cd = (CachedDeserializable) v;
+      if (!cd.isSerialized()) {
+        dst.value = cd.getDeserializedForReading();
       } else {
         /*if (v instanceof ByteSource && CachedDeserializableFactory.preferObject()) {
           dst.value = v;
         } else */ {
-          Object tmp = ((CachedDeserializable) v).getValue();
+          Object tmp = cd.getValue();
           if (tmp instanceof byte[]) {
             byte[] bb = (byte[]) tmp;
             dst.value = bb;
@@ -989,10 +985,10 @@ public abstract class AbstractRegionEntry implements RegionEntry,
       return checkPdxEquals((PdxInstance)v1, v2);
     } else if (v2 instanceof PdxInstance) {
       return checkPdxEquals((PdxInstance)v2, v1);
-    } else if (v1 instanceof OffHeapCachedDeserializable) {
-      return checkOffHeapEquals((OffHeapCachedDeserializable)v1, v2);
-    } else if (v2 instanceof OffHeapCachedDeserializable) {
-      return checkOffHeapEquals((OffHeapCachedDeserializable)v2, v1);
+    } else if (v1 instanceof StoredObject) {
+      return checkOffHeapEquals((StoredObject)v1, v2);
+    } else if (v2 instanceof StoredObject) {
+      return checkOffHeapEquals((StoredObject)v2, v1);
     } else if (v1 instanceof CachedDeserializable) {
       return checkCDEquals((CachedDeserializable)v1, v2, isCompressedOffHeap);
     } else if (v2 instanceof CachedDeserializable) {
@@ -1001,35 +997,29 @@ public abstract class AbstractRegionEntry implements RegionEntry,
       return basicEquals(v1, v2);
     }
   }
-  private static boolean checkOffHeapEquals(@Unretained OffHeapCachedDeserializable cd, @Unretained Object obj) {
-    if (cd.isSerializedPdxInstance()) {
-      PdxInstance pi = InternalDataSerializer.readPdxInstance(cd.getSerializedValue(), GemFireCacheImpl.getForPdx("Could not check value equality"));
+  private static boolean checkOffHeapEquals(@Unretained StoredObject ohVal, @Unretained Object obj) {
+    if (ohVal.isSerializedPdxInstance()) {
+      PdxInstance pi = InternalDataSerializer.readPdxInstance(ohVal.getSerializedValue(), GemFireCacheImpl.getForPdx("Could not check value equality"));
       return checkPdxEquals(pi, obj);
     }
-    if (obj instanceof OffHeapCachedDeserializable) {
-      return cd.checkDataEquals((OffHeapCachedDeserializable)obj);
+    if (obj instanceof StoredObject) {
+      return ohVal.checkDataEquals((StoredObject)obj);
     } else {
       byte[] serializedObj;
       if (obj instanceof CachedDeserializable) {
-        if (!cd.isSerialized()) {
-          if (obj instanceof StoredObject && !((StoredObject) obj).isSerialized()) {
-            // both are byte[]
-            // obj must be DataAsAddress since it was not OffHeapCachedDeserializable
-            // so its byte[] will be small.
-            byte[] objBytes = (byte[]) ((StoredObject) obj).getDeserializedForReading();
-            return cd.checkDataEquals(objBytes);
-          } else {
-            return false;
-          }
+        CachedDeserializable cdObj = (CachedDeserializable) obj;
+        if (!ohVal.isSerialized()) {
+          assert cdObj.isSerialized();
+          return false;
         }
-        serializedObj = ((CachedDeserializable) obj).getSerializedValue();
+        serializedObj = cdObj.getSerializedValue();
       } else if (obj instanceof byte[]) {
-        if (cd.isSerialized()) {
+        if (ohVal.isSerialized()) {
           return false;
         }
         serializedObj = (byte[]) obj;
       } else {
-        if (!cd.isSerialized()) {
+        if (!ohVal.isSerialized()) {
           return false;
         }
         if (obj == null || obj == Token.NOT_AVAILABLE
@@ -1038,19 +1028,20 @@ public abstract class AbstractRegionEntry implements RegionEntry,
         }
         serializedObj = EntryEventImpl.serialize(obj);
       }
-      return cd.checkDataEquals(serializedObj);
+      return ohVal.checkDataEquals(serializedObj);
     }
   }
   
   private static boolean checkCDEquals(CachedDeserializable cd, Object obj, boolean isCompressedOffHeap) {
-    if (cd instanceof StoredObject && !((StoredObject) cd).isSerialized()) {
+    if (!cd.isSerialized()) {
       // cd is an actual byte[].
       byte[] ba2;
-      if (obj instanceof StoredObject) {
-        if (!((StoredObject) obj).isSerialized()) {
+      if (obj instanceof CachedDeserializable) {
+        CachedDeserializable cdObj = (CachedDeserializable) obj;
+        if (!cdObj.isSerialized()) {
           return false;
         }
-        ba2 = (byte[]) ((StoredObject) obj).getDeserializedForReading();
+        ba2 = (byte[]) cdObj.getDeserializedForReading();
       } else if (obj instanceof byte[]) {
         ba2 = (byte[]) obj;
       } else {
@@ -1128,11 +1119,12 @@ public abstract class AbstractRegionEntry implements RegionEntry,
     if (!(obj instanceof PdxInstance)) {
       // obj may be a CachedDeserializable in which case we want to convert it to a PdxInstance even if we are not readSerialized.
       if (obj instanceof CachedDeserializable) {
-        if (obj instanceof StoredObject && !((StoredObject) obj).isSerialized()) {
+        CachedDeserializable cdObj = (CachedDeserializable) obj;
+        if (!cdObj.isSerialized()) {
           // obj is actually a byte[] which will never be equal to a PdxInstance
           return false;
         }
-        Object cdVal = ((CachedDeserializable) obj).getValue();
+        Object cdVal = cdObj.getValue();
         if (cdVal instanceof byte[]) {
           byte[] cdValBytes = (byte[]) cdVal;
           PdxInstance pi = InternalDataSerializer.readPdxInstance(cdValBytes, GemFireCacheImpl.getForPdx("Could not check value equality"));
@@ -1311,9 +1303,9 @@ public abstract class AbstractRegionEntry implements RegionEntry,
           }
           return prepareValueForCache(r, heapValue, event, isEntryUpdate);
         }
-        if (val instanceof ObjectChunk) {
+        if (soVal.hasRefCount()) {
           // if the reused guy has a refcount then need to inc it
-          if (!((ObjectChunk)val).retain()) {
+          if (!soVal.retain()) {
             throw new IllegalStateException("Could not use an off heap value because it was freed");
           }
         }
@@ -1346,16 +1338,8 @@ public abstract class AbstractRegionEntry implements RegionEntry,
         boolean isCompressed = compressedData != data;
         ReferenceCountHelper.setReferenceCountOwner(this);
         MemoryAllocator ma = SimpleMemoryAllocatorImpl.getAllocator(); // fix for bug 47875
-        val = ma.allocateAndInitialize(compressedData, isSerialized, isCompressed); // TODO:KIRK:48068 race happens right after this line
+        val = ma.allocateAndInitialize(compressedData, isSerialized, isCompressed, data);
         ReferenceCountHelper.setReferenceCountOwner(null);
-        if (val instanceof ObjectChunk) {
-          val = new ObjectChunkWithHeapForm((ObjectChunk)val, data);
-        }
-//        if (val instanceof Chunk && r instanceof LocalRegion) {
-//          Chunk c = (Chunk) val;
-//          LocalRegion lr = (LocalRegion) r;
-//          SimpleMemoryAllocatorImpl.debugLog("allocated @" + Long.toHexString(c.getMemoryAddress()) + " reg=" + lr.getFullPath(), false);
-//        }
       }
       return val;
     }
@@ -1365,8 +1349,7 @@ public abstract class AbstractRegionEntry implements RegionEntry,
       byte[] data = ((StoredObject) nv).getSerializedValue();
       nv = CachedDeserializableFactory.create(data);
     }
-    // don't bother checking for SQLFire
-    if (!GemFireCacheImpl.sqlfSystem() && nv instanceof PdxInstanceImpl) {
+    if (nv instanceof PdxInstanceImpl) {
       // We do not want to put PDXs in the cache as values.
       // So get the serialized bytes and use a CachedDeserializable.
       try {
@@ -1574,7 +1557,7 @@ public abstract class AbstractRegionEntry implements RegionEntry,
 
   protected StringBuilder appendFieldsToString(final StringBuilder sb) {
     sb.append("key=").append(getKey()).append("; rawValue=")
-        .append(_getValue()); // OFFHEAP _getValue ok: the current toString on OffHeapCachedDeserializable is safe to use without incing refcount.
+        .append(_getValue()); // OFFHEAP _getValue ok: the current toString on ObjectChunk is safe to use without incing refcount.
     VersionStamp stamp = getVersionStamp();
     if (stamp != null) {
       sb.append("; version=").append(stamp.asVersionTag()+";member="+stamp.getMemberID());

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index 699de2f..9058984 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -75,7 +75,6 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
 import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
 import com.gemstone.gemfire.internal.offheap.OffHeapRegionEntryHelper;
 import com.gemstone.gemfire.internal.offheap.ReferenceCountHelper;
@@ -832,6 +831,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                                        boolean deferLRUCallback,
                                        VersionTag entryVersion, InternalDistributedMember sender, boolean isSynchronizing)
   {
+    assert indexUpdater == null : "indexUpdater should only exist if sqlfire";
     boolean result = false;
     boolean done = false;
     boolean cleared = false;
@@ -922,73 +922,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                 }
                 final boolean oldIsTombstone = oldRe.isTombstone();
                 final int oldSize = owner.calculateRegionEntryValueSize(oldRe);
-                // Neeraj: The below if block is to handle the special
-                // scenario witnessed in SqlFabric for now. (Though its
-                // a general scenario). The scenario is that during GII
-                // it is possible that updates start coming before the
-                // base value reaches through GII. In that scenario the deltas
-                // for that particular key is kept on being added to a list
-                // of deltas. When the base value arrives through this path
-                // of GII the oldValue will be that list of deltas. When the
-                // base values arrives the deltas are applied one by one on that list.
-                // The same scenario is applicable for GemFire also but the below 
-                // code will be executed only in case of sqlfabric now. Probably
-                // the code can be made more generic for both SQL Fabric and GemFire.
-                if (indexUpdater != null) {
-                  oldValue = oldRe.getValueInVM(owner); // OFFHEAP: ListOfDeltas
-                  if (oldValue instanceof ListOfDeltas) {
-                  // apply the deltas on this new value. update index
-                  // Make a new event object
-                  // make it an insert operation
-                  LocalRegion rgn = owner;
-                  if (owner instanceof BucketRegion) {
-                    rgn = ((BucketRegion)owner).getPartitionedRegion();
-                  }
-                  event = EntryEventImpl.create(rgn, Operation.CREATE, key, null,
-                      Boolean.TRUE /* indicate that GII is in progress */,
-                      false, null);
-                  try {
-                  event.setOldValue(newValue);
-                  if (logger.isDebugEnabled()) {
-                    logger.debug("initialImagePut: received base value for list of deltas; event: {}", event);
-                  }
-                  ((ListOfDeltas)oldValue).apply(event);
-                  Object preparedNewValue =oldRe.prepareValueForCache(owner,
-                      event.getNewValueAsOffHeapDeserializedOrRaw(), true);
-                  if(preparedNewValue instanceof ObjectChunk) {
-                    event.setNewValue(preparedNewValue);
-                  }
-                  oldRe.setValue(owner, preparedNewValue, event);
-                  //event.setNewValue(event.getOldValue());
-                  event.setOldValue(null);
-                  try {
-                    indexUpdater.onEvent(owner, event, oldRe);
-                    lruEntryUpdate(oldRe);
-                    owner.updateSizeOnPut(key, oldSize, owner.calculateRegionEntryValueSize(oldRe));
-                    EntryLogger.logInitialImagePut(_getOwnerObject(), key, newValue);
-                    result = true;
-                    done = true;
-                    break;
-                  } finally {
-                    // this must be done within the oldRe sync block
-                    indexUpdater.postEvent(owner, event, oldRe, done);
-                  }
-                  } finally {
-                    if (event != null) {
-                      event.release();
-                      event = null;
-                    }
-                  }
-                  }
-                }
                 try {
-                  if (indexUpdater != null) {
-                    event = EntryEventImpl.create(owner, Operation.CREATE, key,
-                        newValue,
-                        Boolean.TRUE /* indicate that GII is in progress */,
-                        false, null);
-                    indexUpdater.onEvent(owner, event, oldRe);
-                  }
                   result = oldRe.initialImagePut(owner, lastModified, newValue, wasRecovered, acceptedVersionTag);
                   if (result) {
                     if (oldIsTombstone) {
@@ -1025,9 +959,6 @@ public abstract class AbstractRegionMap implements RegionMap {
                   }
                   done = true;
                 } finally {
-                  if (indexUpdater != null) {
-                    indexUpdater.postEvent(owner, event, oldRe, result);
-                  }
                   if (event != null) {
                     event.release();
                     event = null;
@@ -1052,13 +983,6 @@ public abstract class AbstractRegionMap implements RegionMap {
                 true, wasRecovered, versionTagAccepted);
             try {
               if (result) {
-                if (indexUpdater != null) {
-                  event = EntryEventImpl.create(owner, Operation.CREATE, key,
-                      newValue,
-                      Boolean.TRUE /* indicate that GII is in progress */,
-                      false, null);
-                  indexUpdater.onEvent(owner, event, newRe);
-                }
                 if (newValue == Token.TOMBSTONE) {
                   owner.scheduleTombstone(newRe, entryVersion);
                 } else {
@@ -1075,9 +999,6 @@ public abstract class AbstractRegionMap implements RegionMap {
               }
               done = true;
             } finally {
-              if (result && indexUpdater != null) {
-                indexUpdater.postEvent(owner, event, newRe, done);
-              }
               if (event != null) {
                 event.release();
                 event = null;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
index 5b17040..413fc87 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
@@ -1829,11 +1829,12 @@ implements Bucket
       if (!(rawNewValue instanceof CachedDeserializable)) {
         return;
       }
-      if (rawNewValue instanceof StoredObject && !((StoredObject) rawNewValue).isSerialized()) {
+      CachedDeserializable cd = (CachedDeserializable) rawNewValue;
+      if (!cd.isSerialized()) {
         // it is a byte[]; not a Delta
         return;
       }
-      Object instance = ((CachedDeserializable)rawNewValue).getValue();
+      Object instance = cd.getValue();
       if (instance instanceof com.gemstone.gemfire.Delta
           && ((com.gemstone.gemfire.Delta)instance).hasDelta()) {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BytesAndBitsForCompactor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BytesAndBitsForCompactor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BytesAndBitsForCompactor.java
index cc358f5..2746e29 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BytesAndBitsForCompactor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BytesAndBitsForCompactor.java
@@ -16,7 +16,7 @@
  */
 package com.gemstone.gemfire.internal.cache;
 
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
 import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
 
 /**
@@ -32,11 +32,11 @@ import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
  */
 public class BytesAndBitsForCompactor {
   /**
-   * If dataChunk is set then ignore the "data" and "validLength" fields.
-   * The dataChunk field is unretained so it can only be used while the RegionEntry is still synced.
-   * When done with the dataChunk, null it out if you want to reuse the byte[] later.
+   * If offHeapData is set then ignore the "data" and "validLength" fields.
+   * The offHeapData field is unretained so it can only be used while the RegionEntry is still synced.
+   * When done with the offHeapData, null it out if you want to reuse the byte[] later.
    */
-  private @Unretained ObjectChunk dataChunk;
+  private @Unretained StoredObject offHeapData;
   private  byte[] data;
   private  byte userBits=0;
   // length of the data present in the byte array 
@@ -56,8 +56,8 @@ public class BytesAndBitsForCompactor {
   }
 
   
-  public final ObjectChunk getDataChunk() {
-    return this.dataChunk;
+  public final StoredObject getOffHeapData() {
+    return this.offHeapData;
   }
   public final byte[] getBytes() {
     return this.data;
@@ -87,8 +87,8 @@ public class BytesAndBitsForCompactor {
     this.validLength = validLength;    
     this.isReusable = isReusable;
   }
-  public void setChunkData(ObjectChunk c, byte userBits) {
-    this.dataChunk = c;
+  public void setOffHeapData(StoredObject so, byte userBits) {
+    this.offHeapData = so;
     this.userBits = userBits;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachedDeserializable.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachedDeserializable.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachedDeserializable.java
index 04fa515..9effbf6 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachedDeserializable.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachedDeserializable.java
@@ -111,4 +111,12 @@ public interface CachedDeserializable extends Sizeable
    * added by this wrapper class.
    */
   public int getValueSizeInBytes();
+  /**
+   * Returns true if the value stored in this memory chunk is a serialized object. Returns false if it is a byte array.
+   */
+  public boolean isSerialized();
+  /**
+   * Return true if the value uses the java heap; false if not.
+   */
+  public boolean usesHeapForStorage();
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
index 327279b..26e49c9 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
@@ -28,7 +28,6 @@ import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.ByteArrayDataInput;
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
 import com.gemstone.gemfire.internal.Version;
-import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.ValueWrapper;
 import com.gemstone.gemfire.internal.cache.DiskStoreImpl.AsyncDiskEntry;
 import com.gemstone.gemfire.internal.cache.lru.EnableLRU;
 import com.gemstone.gemfire.internal.cache.lru.LRUClockNode;
@@ -40,11 +39,9 @@ import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+import com.gemstone.gemfire.internal.offheap.AddressableMemoryManager;
 import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
 import com.gemstone.gemfire.internal.offheap.ReferenceCountHelper;
-import com.gemstone.gemfire.internal.offheap.Releasable;
-import com.gemstone.gemfire.internal.offheap.UnsafeMemoryChunk;
 import com.gemstone.gemfire.internal.offheap.StoredObject;
 import com.gemstone.gemfire.internal.offheap.annotations.Released;
 import com.gemstone.gemfire.internal.offheap.annotations.Retained;
@@ -200,7 +197,7 @@ public interface DiskEntry extends RegionEntry {
     /**
      * Get the value of an entry that is on disk without
      * faulting it in . It checks for the presence in the buffer also.
-     * This method is used for concurrent map operations, SQLFabric and CQ processing
+     * This method is used for concurrent map operations and CQ processing
      * 
      * @throws DiskAccessException
      * @since 5.1
@@ -208,18 +205,11 @@ public interface DiskEntry extends RegionEntry {
     static Object getValueOnDiskOrBuffer(DiskEntry entry, DiskRegion dr, RegionEntryContext context) {
       @Released Object v = getOffHeapValueOnDiskOrBuffer(entry, dr, context);
       if (v instanceof CachedDeserializable) {
-        if (v instanceof ObjectChunk) {
-          @Released ObjectChunk ohv = (ObjectChunk) v;
-          try {
-            v = ohv.getDeserializedValue(null, null);
-            if (v == ohv) {
-              throw new IllegalStateException("sqlf tried to use getValueOnDiskOrBuffer");
-            }
-          } finally {
-            ohv.release(); // OFFHEAP the offheap ref is decremented here
-          }
-        } else {
-          v = ((CachedDeserializable)v).getDeserializedValue(null, null);
+        CachedDeserializable cd = (CachedDeserializable) v;
+        try {
+          v = cd.getDeserializedValue(null, null);
+        } finally {
+          OffHeapHelper.release(cd); // If v was off-heap it is released here
         }
       }
       return v;
@@ -391,10 +381,11 @@ public interface DiskEntry extends RegionEntry {
         // fix for bug 31757
         return false;
       } else if (v instanceof CachedDeserializable) {
+        CachedDeserializable cd = (CachedDeserializable) v;
         try {
-          if (v instanceof StoredObject && !((StoredObject) v).isSerialized()) {
+          if (!cd.isSerialized()) {
             entry.setSerialized(false);
-            entry.value = ((StoredObject) v).getDeserializedForReading();
+            entry.value = cd.getDeserializedForReading();
             
             //For SQLFire we prefer eager deserialized
 //            if(v instanceof ByteSource) {
@@ -403,7 +394,7 @@ public interface DiskEntry extends RegionEntry {
           } else {
             // don't serialize here if it is not already serialized
             
-            Object tmp = ((CachedDeserializable)v).getValue();
+            Object tmp = cd.getValue();
           //For SQLFire we prefer eager deserialized
 //            if(v instanceof ByteSource) {
 //              entry.setEagerDeserialize();
@@ -679,27 +670,28 @@ public interface DiskEntry extends RegionEntry {
     }
     
     /**
-     * Note that the Chunk this ValueWrapper is created with
+     * Note that the StoredObject this ValueWrapper is created with
      * is unretained so it must be used before the owner of
-     * the chunk releases it.
+     * the StoredObject releases it.
      * Since the RegionEntry that has the value we are writing to
      * disk has it retained we are ok as long as this ValueWrapper's
      * life ends before the RegionEntry sync is released.
-     * Note that this class is only used with uncompressed chunks.
+     * Note that this class is only used with uncompressed StoredObjects.
      */
-    public static class ChunkValueWrapper implements ValueWrapper {
-      private final @Unretained ObjectChunk chunk;
-      public ChunkValueWrapper(ObjectChunk c) {
-        assert !c.isCompressed();
-        this.chunk = c;
+    public static class OffHeapValueWrapper implements ValueWrapper {
+      private final @Unretained StoredObject offHeapData;
+      public OffHeapValueWrapper(StoredObject so) {
+        assert so.hasRefCount();
+        assert !so.isCompressed();
+        this.offHeapData = so;
       }
       @Override
       public boolean isSerialized() {
-        return this.chunk.isSerialized();
+        return this.offHeapData.isSerialized();
       }
       @Override
       public int getLength() {
-        return this.chunk.getDataSize();
+        return this.offHeapData.getDataSize();
       }
       @Override
       public byte getUserBits() {
@@ -716,21 +708,21 @@ public interface DiskEntry extends RegionEntry {
           return;
         }
         if (maxOffset > bb.capacity()) {
-          ByteBuffer chunkbb = this.chunk.createDirectByteBuffer();
+          ByteBuffer chunkbb = this.offHeapData.createDirectByteBuffer();
           if (chunkbb != null) {
             flushable.flush(bb, chunkbb);
             return;
           }
         }
-        final long bbAddress = ObjectChunk.getDirectByteBufferAddress(bb);
+        final long bbAddress = AddressableMemoryManager.getDirectByteBufferAddress(bb);
         if (bbAddress != 0L) {
           int bytesRemaining = maxOffset;
           int availableSpace = bb.remaining();
           long addrToWrite = bbAddress + bb.position();
-          long addrToRead = this.chunk.getAddressForReading(0, maxOffset);
+          long addrToRead = this.offHeapData.getAddressForReadingData(0, maxOffset);
           if (bytesRemaining > availableSpace) {
             do {
-              UnsafeMemoryChunk.copyMemory(addrToRead, addrToWrite, availableSpace);
+              AddressableMemoryManager.copyMemory(addrToRead, addrToWrite, availableSpace);
               bb.position(bb.position()+availableSpace);
               addrToRead += availableSpace;
               bytesRemaining -= availableSpace;
@@ -739,13 +731,13 @@ public interface DiskEntry extends RegionEntry {
               availableSpace = bb.remaining();
             } while (bytesRemaining > availableSpace);
           }
-          UnsafeMemoryChunk.copyMemory(addrToRead, addrToWrite, bytesRemaining);
+          AddressableMemoryManager.copyMemory(addrToRead, addrToWrite, bytesRemaining);
           bb.position(bb.position()+bytesRemaining);
         } else {
-          long addr = this.chunk.getAddressForReading(0, maxOffset);
+          long addr = this.offHeapData.getAddressForReadingData(0, maxOffset);
           final long endAddr = addr + maxOffset;
           while (addr != endAddr) {
-            bb.put(UnsafeMemoryChunk.readAbsoluteByte(addr));
+            bb.put(AddressableMemoryManager.readByte(addr));
             addr++;
             if (!bb.hasRemaining()) {
               flushable.flush();
@@ -755,10 +747,28 @@ public interface DiskEntry extends RegionEntry {
       }
       @Override
       public String getBytesAsString() {
-        return this.chunk.getStringForm();
+        return this.offHeapData.getStringForm();
       }
     }
 
+    /**
+     * Returns true if the given object is off-heap
+     * and it is worth wrapping a reference to it
+     * instead of copying its data to the heap.
+     * Currently all StoredObject's with a refCount are
+     * wrapped.
+     */
+    public static boolean wrapOffHeapReference(Object o) {
+      if (o instanceof StoredObject) {
+        StoredObject so = (StoredObject) o;
+        if (so.hasRefCount()) {
+          // 
+          return true;
+        }
+      }
+      return false;
+    }
+    
     public static ValueWrapper createValueWrapper(Object value, EntryEventImpl event) {
       if (value == Token.INVALID) {
         // even though it is not serialized we say it is because
@@ -782,19 +792,14 @@ public interface DiskEntry extends RegionEntry {
         byte[] bytes;
         if (value instanceof CachedDeserializable) {
           CachedDeserializable proxy = (CachedDeserializable)value;
-          if (proxy instanceof ObjectChunk) {
-            return new ChunkValueWrapper((ObjectChunk) proxy);
+          if (wrapOffHeapReference(proxy)) {
+            return new OffHeapValueWrapper((StoredObject) proxy);
           }
-          if (proxy instanceof StoredObject) {
-            StoredObject ohproxy = (StoredObject) proxy;
-            isSerializedObject = ohproxy.isSerialized();
-            if (isSerializedObject) {
-              bytes = ohproxy.getSerializedValue();
-            } else {
-              bytes = (byte[]) ohproxy.getDeserializedForReading();
-            }
-          } else {
+          isSerializedObject = proxy.isSerialized();
+          if (isSerializedObject) {
             bytes = proxy.getSerializedValue();
+          } else {
+            bytes = (byte[]) proxy.getDeserializedForReading();
           }
           if (event != null && isSerializedObject) {
             event.setCachedSerializedNewValue(bytes);
@@ -826,15 +831,15 @@ public interface DiskEntry extends RegionEntry {
         // For off-heap it should be faster to pass a reference to the
         // StoredObject instead of using the cached byte[] (unless it is also compressed).
         // Since NIO is used if the chunk of memory is large we can write it
-        // to the file with using the off-heap memory with no extra copying.
+        // to the file using the off-heap memory with no extra copying.
         // So we give preference to getRawNewValue over getCachedSerializedNewValue
         Object rawValue = null;
         if (!event.hasDelta()) {
           // We don't do this for the delta case because getRawNewValue returns delta
           // and we want to write the entire new value to disk.
           rawValue = event.getRawNewValue();
-          if (rawValue instanceof ObjectChunk) {
-            return new ChunkValueWrapper((ObjectChunk) rawValue);
+          if (wrapOffHeapReference(rawValue)) {
+            return new OffHeapValueWrapper((StoredObject) rawValue);
           }
         }
         if (event.getCachedSerializedNewValue() != null) {
@@ -1161,10 +1166,6 @@ public interface DiskEntry extends RegionEntry {
       if (result instanceof CachedDeserializable) {
         result = ((CachedDeserializable)result).getDeserializedValue(null, null);
       }
-      if (result instanceof StoredObject) {
-        ((StoredObject) result).release();
-        throw new IllegalStateException("sqlf tried to use getValueInVMOrDiskWithoutFaultIn");
-      }
       return result;
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskStoreImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskStoreImpl.java
index e4ef21d..e813058 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskStoreImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskStoreImpl.java
@@ -110,10 +110,6 @@ import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
-import com.gemstone.gemfire.internal.offheap.MemoryChunkWithRefCount;
-import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
-import com.gemstone.gemfire.internal.offheap.annotations.Released;
-import com.gemstone.gemfire.internal.offheap.annotations.Retained;
 import com.gemstone.gemfire.internal.util.BlobHelper;
 import com.gemstone.gemfire.pdx.internal.EnumInfo;
 import com.gemstone.gemfire.pdx.internal.PdxField;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
index 19496da..111a6d5 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
@@ -121,9 +121,6 @@ import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewa
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
-import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
-import com.gemstone.gemfire.internal.offheap.annotations.Released;
 import com.gemstone.gemfire.internal.offheap.annotations.Retained;
 import com.gemstone.gemfire.internal.sequencelog.RegionLogger;
 import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch;
@@ -2449,7 +2446,6 @@ public class DistributedRegion extends LocalRegion implements
     boolean fromServer = false;
     EntryEventImpl event = null;
     @Retained Object result = null;
-    boolean incrementUseCountForSqlf = false;
     try {
     {
       if (this.srp != null) {
@@ -2515,7 +2511,6 @@ public class DistributedRegion extends LocalRegion implements
           	((BucketRegion)this).handleWANEvent(event);
           }
           re = basicPutEntry(event, lastModified);
-          incrementUseCountForSqlf = GemFireCacheImpl.sqlfSystem() ;
         } catch (ConcurrentCacheModificationException e) {
           // the cache was modified while we were searching for this entry and
           // the netsearch result was elided.  Return the current value from the cache
@@ -2547,11 +2542,6 @@ public class DistributedRegion extends LocalRegion implements
     } else {
       result = event.getNewValue();     
     }
-    //For SQLFire , we need to increment the use count so that returned
-    //object has use count 2
-    if( incrementUseCountForSqlf && result instanceof ObjectChunk) {
-      ((ObjectChunk)result).retain();
-    }
     return result;
     } finally {
       if (event != null) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
index 6c58790..dfd20ef 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
@@ -73,7 +73,6 @@ import com.gemstone.gemfire.internal.lang.StringUtils;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
 import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
 import com.gemstone.gemfire.internal.offheap.OffHeapRegionEntryHelper;
 import com.gemstone.gemfire.internal.offheap.ReferenceCountHelper;
@@ -834,10 +833,7 @@ public class EntryEventImpl
       }
       boolean doCopyOnRead = getRegion().isCopyOnRead();
       if (ov != null) {
-        if (ov instanceof StoredObject) {
-          // TODO OFFHEAP: returns off-heap PdxInstance
-          return ((StoredObject) ov).getValueAsDeserializedHeapObject();
-        } else
+        // TODO OFFHEAP: returns off-heap PdxInstance
         if (ov instanceof CachedDeserializable) {
           CachedDeserializable cd = (CachedDeserializable)ov;
           if (doCopyOnRead) {
@@ -930,9 +926,9 @@ public class EntryEventImpl
     if (this.offHeapOk) {
       OffHeapHelper.releaseAndTrackOwner(this.newValue, this);
     }
-    if (v instanceof ObjectChunk) {
+    if (isOffHeapReference(v)) {
       ReferenceCountHelper.setReferenceCountOwner(this);
-      if (!((ObjectChunk) v).retain()) {
+      if (!((StoredObject) v).retain()) {
         ReferenceCountHelper.setReferenceCountOwner(null);
         this.newValue = null;
         return;
@@ -946,19 +942,23 @@ public class EntryEventImpl
    * Returns true if this event has a reference to an off-heap new or old value.
    */
   public boolean hasOffHeapValue() {
-    return (this.newValue instanceof ObjectChunk) || (this.oldValue instanceof ObjectChunk);
+    return isOffHeapReference(this.newValue) || isOffHeapReference(this.oldValue);
   }
   
   @Unretained
   protected final Object basicGetNewValue() {
     Object result = this.newValue;
-    if (!this.offHeapOk && result instanceof ObjectChunk) {
+    if (!this.offHeapOk && isOffHeapReference(result)) {
       //this.region.getCache().getLogger().info("DEBUG new value already freed " + System.identityHashCode(result));
       throw new IllegalStateException("Attempt to access off heap value after the EntryEvent was released.");
     }
     return result;
   }
   
+  private static boolean isOffHeapReference(Object ref) {
+    return (ref instanceof StoredObject) && ((StoredObject)ref).hasRefCount();
+  }
+  
   private class OldValueOwner {
     private EntryEventImpl getEvent() {
       return EntryEventImpl.this;
@@ -992,12 +992,10 @@ public class EntryEventImpl
     @Released final Object curOldValue = this.oldValue;
     if (v == curOldValue) return;
     if (this.offHeapOk) {
-      if (curOldValue instanceof ObjectChunk) {
-        if (ReferenceCountHelper.trackReferenceCounts()) {
-          OffHeapHelper.releaseAndTrackOwner(curOldValue, new OldValueOwner());
-        } else {
-          OffHeapHelper.release(curOldValue);
-        }
+      if (ReferenceCountHelper.trackReferenceCounts()) {
+        OffHeapHelper.releaseAndTrackOwner(curOldValue, new OldValueOwner());
+      } else {
+        OffHeapHelper.release(curOldValue);
       }
     }
     
@@ -1008,17 +1006,18 @@ public class EntryEventImpl
   private void retainAndSetOldValue(@Retained(ENTRY_EVENT_OLD_VALUE) Object v) {
     if (v == this.oldValue) return;
     
-    if (v instanceof ObjectChunk) {
+    if (isOffHeapReference(v)) {
+      StoredObject so = (StoredObject) v;
       if (ReferenceCountHelper.trackReferenceCounts()) {
         ReferenceCountHelper.setReferenceCountOwner(new OldValueOwner());
-        boolean couldNotRetain = (!((ObjectChunk) v).retain());
+        boolean couldNotRetain = (!so.retain());
         ReferenceCountHelper.setReferenceCountOwner(null);
         if (couldNotRetain) {
           this.oldValue = null;
           return;
         }
       } else {
-        if (!((ObjectChunk) v).retain()) {
+        if (!so.retain()) {
           this.oldValue = null;
           return;
         }
@@ -1031,7 +1030,7 @@ public class EntryEventImpl
   private Object basicGetOldValue() {
     @Unretained(ENTRY_EVENT_OLD_VALUE)
     Object result = this.oldValue;
-    if (!this.offHeapOk && result instanceof ObjectChunk) {
+    if (!this.offHeapOk && isOffHeapReference(result)) {
       //this.region.getCache().getLogger().info("DEBUG old value already freed " + System.identityHashCode(result));
       throw new IllegalStateException("Attempt to access off heap value after the EntryEvent was released.");
     }
@@ -1056,7 +1055,6 @@ public class EntryEventImpl
   }
   /**
    * Just like getRawOldValue except if the raw old value is off-heap deserialize it.
-   * Note that in some cases sqlf ignores the request to deserialize.
    */
   @Unretained(ENTRY_EVENT_OLD_VALUE)
   public final Object getOldValueAsOffHeapDeserializedOrRaw() {
@@ -1107,11 +1105,8 @@ public class EntryEventImpl
         // I'm not sure this can even happen
         return AbstractRegion.handleNotAvailable(nv);
       }
-      if (nv instanceof StoredObject) {
-        // TODO OFFHEAP currently we copy offheap new value to the heap here. Check callers of this method to see if they can be optimized to use offheap values.
-        // TODO OFFHEAP: returns off-heap PdxInstance
-        return ((StoredObject) nv).getValueAsDeserializedHeapObject();
-      } else
+      // TODO OFFHEAP currently we copy offheap new value to the heap here. Check callers of this method to see if they can be optimized to use offheap values.
+      // TODO OFFHEAP: returns off-heap PdxInstance
       if (nv instanceof CachedDeserializable) {
         CachedDeserializable cd = (CachedDeserializable)nv;
         Object v = null;
@@ -1284,18 +1279,16 @@ public class EntryEventImpl
     @Unretained(ENTRY_EVENT_NEW_VALUE)
     final Object tmp = basicGetNewValue();
     if (tmp instanceof CachedDeserializable) {
-      if (tmp instanceof StoredObject) {
-        if (!((StoredObject) tmp).isSerialized()) {
-          // TODO OFFHEAP can we handle offheap byte[] better?
-          return null;
-        }
+      CachedDeserializable cd = (CachedDeserializable) tmp;
+      if (!cd.isSerialized()) {
+        // TODO OFFHEAP can we handle offheap byte[] better?
+        return null;
       }
       byte[] bytes = this.newValueBytes;
       if (bytes == null) {
         bytes = this.cachedSerializedNewValue;
       }
-      return new SerializedCacheValueImpl(this, getRegion(), this.re,
-          (CachedDeserializable)tmp, bytes);
+      return new SerializedCacheValueImpl(this, getRegion(), this.re, cd, bytes);
     } else {
       // Note we return null even if cachedSerializedNewValue is not null.
       // This is because some callers of this method use it to indicate
@@ -1360,7 +1353,7 @@ public class EntryEventImpl
       @Unretained(ENTRY_EVENT_NEW_VALUE)
       final StoredObject so = (StoredObject) nv;
       final boolean isSerialized = so.isSerialized();
-      if (nv instanceof ObjectChunk) {
+      if (so.hasRefCount()) {
         if (importer.isUnretainedNewReferenceOk()) {
           importer.importNewObject(nv, isSerialized);
         } else {
@@ -1447,7 +1440,7 @@ public class EntryEventImpl
     if (ov instanceof StoredObject) {
       final StoredObject so = (StoredObject) ov;
       final boolean isSerialized = so.isSerialized();
-      if (ov instanceof ObjectChunk) {
+      if (so.hasRefCount()) {
         if (importer.isUnretainedOldReferenceOk()) {
           importer.importOldObject(ov, isSerialized);
         } else {
@@ -1501,7 +1494,6 @@ public class EntryEventImpl
   }
   /**
    * Just like getRawNewValue(true) except if the raw new value is off-heap deserialize it.
-   * Note that in some cases sqlf ignores the request to deserialize.
    */
   @Unretained(ENTRY_EVENT_NEW_VALUE)
   public final Object getNewValueAsOffHeapDeserializedOrRaw() {
@@ -1518,16 +1510,7 @@ public class EntryEventImpl
    */
   @Retained(ENTRY_EVENT_NEW_VALUE)
   public StoredObject getOffHeapNewValue() {
-    final Object tmp = basicGetNewValue();
-    if (tmp instanceof StoredObject) {
-      StoredObject result = (StoredObject) tmp;
-      if (!result.retain()) {
-        return null;
-      }
-      return result;
-    } else {
-      return null;
-    }
+    return convertToStoredObject(basicGetNewValue());
   }
   
   /**
@@ -1536,28 +1519,23 @@ public class EntryEventImpl
    */
   @Retained(ENTRY_EVENT_OLD_VALUE)
   public StoredObject getOffHeapOldValue() {
-    final Object tmp = basicGetOldValue();
-    if (tmp instanceof StoredObject) {
-      StoredObject result = (StoredObject) tmp;
-      if (!result.retain()) {
-        return null;
-      }
-      return result;
-    } else {
+    return convertToStoredObject(basicGetOldValue());
+  }
+
+  private static StoredObject convertToStoredObject(final Object tmp) {
+    if (!(tmp instanceof StoredObject)) {
+      return null;
+    }
+    StoredObject result = (StoredObject) tmp;
+    if (!result.retain()) {
       return null;
     }
+    return result;
   }
-
-  /**
-   * Result may be unretained because sqlf getDeserializedForReading returns unretained.
-   */
+  
   public final Object getDeserializedValue() {
     if (this.delta == null) {
       final Object val = basicGetNewValue();
-      if (val instanceof StoredObject) {
-        // TODO OFFHEAP: returns off-heap PdxInstance
-        return ((StoredObject) val).getValueAsDeserializedHeapObject();
-      } else 
       if (val instanceof CachedDeserializable) {
         return ((CachedDeserializable)val).getDeserializedForReading();
       }
@@ -1869,8 +1847,8 @@ public class EntryEventImpl
     Object preparedV = reentry.prepareValueForCache(this.region, v, this, this.hasDelta());
     if (preparedV != v) {
       v = preparedV;
-      if (v instanceof ObjectChunk) {
-        if (!((ObjectChunk) v).isCompressed()) { // fix bug 52109
+      if (v instanceof StoredObject) {
+        if (!((StoredObject) v).isCompressed()) { // fix bug 52109
           // If we put it off heap and it is not compressed then remember that value.
           // Otherwise we want to remember the decompressed value in the event.
           basicSetNewValue(v);
@@ -1931,8 +1909,8 @@ public class EntryEventImpl
       success = true;
     }
     } finally {
-      if (!success && reentry instanceof OffHeapRegionEntry && v instanceof ObjectChunk) {
-        OffHeapRegionEntryHelper.releaseEntry((OffHeapRegionEntry)reentry, (ObjectChunk)v);
+      if (!success && reentry instanceof OffHeapRegionEntry && v instanceof StoredObject) {
+        OffHeapRegionEntryHelper.releaseEntry((OffHeapRegionEntry)reentry, (StoredObject)v);
       }      
     }
     if (logger.isTraceEnabled()) {
@@ -2230,9 +2208,9 @@ public class EntryEventImpl
 
   /**
    * If a PdxInstance is returned then it will have an unretained reference
-   * to Chunk's off-heap address.
+   * to the StoredObject's off-heap address.
    */
-  public static @Unretained Object deserializeChunk(ObjectChunk bytes) {
+  public static @Unretained Object deserializeOffHeap(StoredObject bytes) {
     if (bytes == null)
       return null;
     try {
@@ -2418,9 +2396,7 @@ public class EntryEventImpl
         Object nv = basicGetNewValue();
         boolean newValueSerialized = nv instanceof CachedDeserializable;
         if (newValueSerialized) {
-          if (nv instanceof StoredObject) {
-            newValueSerialized = ((StoredObject) nv).isSerialized();
-          }
+          newValueSerialized = ((CachedDeserializable) nv).isSerialized();
         }
         out.writeBoolean(newValueSerialized);
         if (newValueSerialized) {
@@ -2443,9 +2419,7 @@ public class EntryEventImpl
       Object ov = basicGetOldValue();
       boolean oldValueSerialized = ov instanceof CachedDeserializable;
       if (oldValueSerialized) {
-        if (ov instanceof StoredObject) {
-          oldValueSerialized = ((StoredObject) ov).isSerialized();
-        }
+        oldValueSerialized = ((CachedDeserializable) ov).isSerialized();
       }
       out.writeBoolean(oldValueSerialized);
       if (oldValueSerialized) {
@@ -2517,14 +2491,12 @@ public class EntryEventImpl
     @Unretained(ENTRY_EVENT_OLD_VALUE)
     final Object tmp = basicGetOldValue();
     if (tmp instanceof CachedDeserializable) {
-      if (tmp instanceof StoredObject) {
-        if (!((StoredObject) tmp).isSerialized()) {
-          // TODO OFFHEAP can we handle offheap byte[] better?
-          return null;
-        }
+      CachedDeserializable cd = (CachedDeserializable) tmp;
+      if (!cd.isSerialized()) {
+        // TODO OFFHEAP can we handle offheap byte[] better?
+        return null;
       }
-      return new SerializedCacheValueImpl(this, this.region, this.re,
-          (CachedDeserializable)tmp, this.oldValueBytes);
+      return new SerializedCacheValueImpl(this, this.region, this.re, cd, this.oldValueBytes);
     }
     else {
       return null;
@@ -2881,7 +2853,7 @@ public class EntryEventImpl
     private final byte[] serializedValue;
     
     SerializedCacheValueImpl(EntryEventImpl event, Region r, RegionEntry re, @Unretained CachedDeserializable cd, byte[] serializedBytes) {
-      if (cd instanceof ObjectChunk) {
+      if (isOffHeapReference(cd)) {
         this.event = event;
       } else {
         this.event = null;
@@ -2958,6 +2930,16 @@ public class EntryEventImpl
     public void sendTo(DataOutput out) throws IOException {
       DataSerializer.writeObject(getCd(), out);
     }
+
+    @Override
+    public boolean isSerialized() {
+      return getCd().isSerialized();
+    }
+
+    @Override
+    public boolean usesHeapForStorage() {
+      return getCd().usesHeapForStorage();
+    }
   }
 //////////////////////////////////////////////////////////////////////////////////////////
   
@@ -3060,14 +3042,14 @@ public class EntryEventImpl
     Object nv = basicGetNewValue();
     this.offHeapOk = false;
     
-    if (ov instanceof ObjectChunk) {
+    if (ov instanceof StoredObject) {
       //this.region.getCache().getLogger().info("DEBUG freeing ref to old value on " + System.identityHashCode(ov));
       if (ReferenceCountHelper.trackReferenceCounts()) {
         ReferenceCountHelper.setReferenceCountOwner(new OldValueOwner());
-        ((ObjectChunk) ov).release();
+        ((StoredObject) ov).release();
         ReferenceCountHelper.setReferenceCountOwner(null);
       } else {
-        ((ObjectChunk) ov).release();
+        ((StoredObject) ov).release();
       }
     }
     OffHeapHelper.releaseAndTrackOwner(nv, this);
@@ -3078,7 +3060,7 @@ public class EntryEventImpl
    * Once this is called on an event it does not need to have release called.
    */
   public void disallowOffHeapValues() {
-    if (this.newValue instanceof ObjectChunk || this.oldValue instanceof ObjectChunk) {
+    if (isOffHeapReference(this.newValue) || isOffHeapReference(this.oldValue)) {
       throw new IllegalStateException("This event does not support off-heap values");
     }
     this.offHeapOk = false;
@@ -3087,12 +3069,11 @@ public class EntryEventImpl
   /**
    * This copies the off-heap new and/or old value to the heap.
    * As a result the current off-heap new/old will be released.
-   * @throws IllegalStateException if called with an event for sqlf data.
    */
   @Released({ENTRY_EVENT_NEW_VALUE, ENTRY_EVENT_OLD_VALUE})
   public void copyOffHeapToHeap() {
     Object ov = basicGetOldValue();
-    if (ov instanceof ObjectChunk) {
+    if (isOffHeapReference(ov)) {
       if (ReferenceCountHelper.trackReferenceCounts()) {
         ReferenceCountHelper.setReferenceCountOwner(new OldValueOwner());
         this.oldValue = OffHeapHelper.copyAndReleaseIfNeeded(ov);
@@ -3102,19 +3083,19 @@ public class EntryEventImpl
       }
     }
     Object nv = basicGetNewValue();
-    if (nv instanceof ObjectChunk) {
+    if (isOffHeapReference(nv)) {
       ReferenceCountHelper.setReferenceCountOwner(this);
       this.newValue = OffHeapHelper.copyAndReleaseIfNeeded(nv);
       ReferenceCountHelper.setReferenceCountOwner(null);
     }
-    if (this.newValue instanceof ObjectChunk || this.oldValue instanceof ObjectChunk) {
+    if (isOffHeapReference(this.newValue) || isOffHeapReference(this.oldValue)) {
       throw new IllegalStateException("event's old/new value still off-heap after calling copyOffHeapToHeap");
     }
     this.offHeapOk = false;
   }
 
   public boolean isOldValueOffHeap() {
-    return this.oldValue instanceof ObjectChunk;
+    return isOffHeapReference(this.oldValue);
   }
   public final boolean isFetchFromHDFS() {
     return fetchFromHDFS;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index b6d8c49..966130a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -203,7 +203,6 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
 import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
 import com.gemstone.gemfire.internal.offheap.ReferenceCountHelper;
 import com.gemstone.gemfire.internal.offheap.StoredObject;
@@ -1423,6 +1422,7 @@ public class LocalRegion extends AbstractRegion
   public Object getRetained(Object key, Object aCallbackArgument,
       boolean generateCallbacks, boolean disableCopyOnRead,
       ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean opScopeIsLocal) throws TimeoutException, CacheLoaderException {
+    // TODO OFFHEAP: the last parameter "retainResult" should be true for getRetained. Need to look into what it is being set to false.
     return get(key, aCallbackArgument, generateCallbacks, disableCopyOnRead, true, requestingClient, clientEvent, returnTombstones, opScopeIsLocal, true, false);
   }
   /**
@@ -1585,14 +1585,7 @@ public class LocalRegion extends AbstractRegion
           } else if (!disableCopyOnRead) {
             result = conditionalCopy(result);
           }
-          //For sqlf since the deserialized value is nothing but chunk
-          // before returning the found value increase its use count
-          if(GemFireCacheImpl.sqlfSystem() && result instanceof ObjectChunk) {
-            if(!((ObjectChunk)result).retain()) {
-              return null;
-            }
-          }
-          // what was a miss is now a hit
+         // what was a miss is now a hit
           RegionEntry re = null;
           if (isCreate) {
             re = basicGetEntry(keyInfo.getKey());

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
index 3a2d1ed..94e61fb 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
@@ -4592,8 +4592,8 @@ public final class Oplog implements CompactableOplog, Flushable {
         // TODO: compaction needs to get version?
         byte userBits = wrapper.getBits();
         ValueWrapper vw;
-        if (wrapper.getDataChunk() != null) {
-          vw = new DiskEntry.Helper.ChunkValueWrapper(wrapper.getDataChunk());
+        if (wrapper.getOffHeapData() != null) {
+          vw = new DiskEntry.Helper.OffHeapValueWrapper(wrapper.getOffHeapData());
         } else {
           vw = new DiskEntry.Helper.CompactorValueWrapper(wrapper.getBytes(), wrapper.getValidLength());
         }
@@ -4613,8 +4613,8 @@ public final class Oplog implements CompactableOplog, Flushable {
             LocalizedStrings.Oplog_FAILED_WRITING_KEY_TO_0_DUE_TO_FAILURE_IN_ACQUIRING_READ_LOCK_FOR_ASYNCH_WRITING
                 .toLocalizedString(this.diskFile.getPath()), ie, getParent());
       } finally {
-        if (wrapper.getDataChunk() != null) {
-          wrapper.setChunkData(null, (byte) 0);
+        if (wrapper.getOffHeapData() != null) {
+          wrapper.setOffHeapData(null, (byte) 0);
         }
         if (exceptionOccured) {
           did.setValueLength(len);
@@ -6127,8 +6127,8 @@ public final class Oplog implements CompactableOplog, Flushable {
                     // skip this guy his oplogId changed
                     if (!wrapper.isReusable()) {
                       wrapper = new BytesAndBitsForCompactor();
-                    } else if (wrapper.getDataChunk() != null) {
-                      wrapper.setChunkData(null, (byte) 0);
+                    } else if (wrapper.getOffHeapData() != null) {
+                      wrapper.setOffHeapData(null, (byte) 0);
                     }
                     continue;
                   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
index 541c453..5a5de2c 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
@@ -252,7 +252,6 @@ import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
 import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
 import com.gemstone.gemfire.internal.sequencelog.RegionLogger;
 import com.gemstone.gemfire.internal.util.TransformUtils;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PreferBytesCachedDeserializable.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PreferBytesCachedDeserializable.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PreferBytesCachedDeserializable.java
index 7ed88b5..933362d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PreferBytesCachedDeserializable.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PreferBytesCachedDeserializable.java
@@ -150,8 +150,17 @@ public final class PreferBytesCachedDeserializable implements CachedDeserializab
 
   @Override
   public Version[] getSerializationVersions() {
-    // TODO Auto-generated method stub
     return null;
   }
 
+  @Override
+  public boolean isSerialized() {
+    return true;
+  }
+
+  @Override
+  public boolean usesHeapForStorage() {
+    return true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
index e2cfb90..08101c2 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
@@ -31,7 +31,6 @@ import com.gemstone.gemfire.internal.cache.lru.NewLRUClockHand;
 import com.gemstone.gemfire.internal.cache.versions.VersionSource;
 import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
-import com.gemstone.gemfire.internal.offheap.MemoryChunkWithRefCount;
 import com.gemstone.gemfire.internal.offheap.annotations.Released;
 import com.gemstone.gemfire.internal.offheap.annotations.Retained;
 import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
@@ -237,10 +236,9 @@ public interface RegionEntry {
    */
   public void setValue(RegionEntryContext context, Object value, EntryEventImpl event) throws RegionClearedException;
   /**
-   * Obtain and return the value of this entry using {@link #_getValue()}.
-   * If the value is a MemoryChunkWithRefCount then increment its refcount.
-   * WARNING: if a MemoryChunkWithRefCount is returned then the caller MUST
-   * call {@link MemoryChunkWithRefCount#release()}.
+   * Obtain, retain and return the value of this entry.
+   * WARNING: if a StoredObject is returned and it has a refCount then the caller MUST
+   * make sure that {@link StoredObject#release()} before the returned object is garbage collected.
    * 
    * This is only retained in off-heap subclasses.  However, it's marked as
    * Retained here so that callers are aware that the value may be retained.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteDestroyMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteDestroyMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteDestroyMessage.java
index a3c9402..4a8c101 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteDestroyMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteDestroyMessage.java
@@ -210,10 +210,10 @@ public class RemoteDestroyMessage extends RemoteOperationMessageWithDirectReply
       this.hasOldValue = true;
       CachedDeserializable cd = (CachedDeserializable) event.getSerializedOldValue();
       if (cd != null) {
-        if (cd instanceof StoredObject && !((StoredObject) cd).isSerialized()) {
+        if (!cd.isSerialized()) {
           // it is a byte[]
           this.oldValueIsSerialized = false;
-          setOldValBytes((byte[]) ((StoredObject) cd).getDeserializedForReading());
+          setOldValBytes((byte[]) cd.getDeserializedForReading());
         } else {
           this.oldValueIsSerialized = true;
           Object o = cd.getValue();