You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/22 22:43:57 UTC

[069/100] [abbrv] incubator-geode git commit: GEODE-917: Merge branch 'feature/GEODE-917' into develop

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapHelperJUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapHelperJUnitTest.java
index 0000000,fd0eb4f..b1e3af0
mode 000000,100644..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapHelperJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapHelperJUnitTest.java
@@@ -1,0 -1,314 +1,314 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import static org.junit.Assert.*;
+ import static org.mockito.Mockito.mock;
+ import static org.hamcrest.CoreMatchers.*;
+ 
+ import java.nio.ByteBuffer;
+ 
+ import org.junit.After;
+ import org.junit.Before;
+ import org.junit.Test;
+ import org.junit.experimental.categories.Category;
+ 
+ import com.gemstone.gemfire.LogWriter;
+ import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+ import com.gemstone.gemfire.internal.cache.VMCachedDeserializable;
+ import com.gemstone.gemfire.test.junit.categories.UnitTest;
+ 
+ /**
+  * @author khowe
+  */
+ @Category(UnitTest.class)
+ public class OffHeapHelperJUnitTest extends AbstractStoredObjectTestBase {
+ 
+   private MemoryChunkWithRefCount storedObject                 = null;
+   private Object                  deserializedRegionEntryValue = null;
+   private byte[]                  serializedRegionEntryValue   = null;
+   private MemoryAllocator         ma;
+ 
+   @Before
+   public void setUp() {
+     OutOfOffHeapMemoryListener ooohml = mock(OutOfOffHeapMemoryListener.class);
+     OffHeapMemoryStats stats = mock(OffHeapMemoryStats.class);
+     LogWriter lw = mock(LogWriter.class);
+ 
+     ma = SimpleMemoryAllocatorImpl.create(ooohml, stats, lw, 3, OffHeapStorage.MIN_SLAB_SIZE * 3,
+         OffHeapStorage.MIN_SLAB_SIZE);
+ 
+   }
+ 
+   /**
+    * Extracted from JUnit setUp() to reduce test overhead for cases where
+    * offheap memory isn't needed.
+    */
+   private void allocateOffHeapSerialized() {
+     Object regionEntryValue = getValue();
+     storedObject = createValueAsSerializedStoredObject(regionEntryValue);
+     deserializedRegionEntryValue = storedObject.getValueAsDeserializedHeapObject();
+     serializedRegionEntryValue = storedObject.getSerializedValue();
+   }
+ 
+   private void allocateOffHeapDeserialized() {
+     Object regionEntryValue = getValue();
+     storedObject = createValueAsUnserializedStoredObject(regionEntryValue);
+     deserializedRegionEntryValue = storedObject.getValueAsDeserializedHeapObject();
+     serializedRegionEntryValue = storedObject.getSerializedValue();
+   }
+ 
+   @After
+   public void tearDown() {
+     SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+   }
+ 
+   @Override
+   public Object getValue() {
+     return Long.valueOf(Long.MAX_VALUE);
+   }
+ 
+   @Override
+   public byte[] getValueAsByteArray() {
+     return convertValueToByteArray(getValue());
+   }
+ 
+   private byte[] convertValueToByteArray(Object value) {
+     return ByteBuffer.allocate(Long.SIZE / Byte.SIZE).putLong((Long) value).array();
+   }
+ 
+   @Override
+   public Object convertByteArrayToObject(byte[] valueInByteArray) {
+     return ByteBuffer.wrap(valueInByteArray).getLong();
+   }
+ 
+   @Override
+   public Object convertSerializedByteArrayToObject(byte[] valueInSerializedByteArray) {
+     return EntryEventImpl.deserialize(valueInSerializedByteArray);
+   }
+ 
+   @Override
+   protected MemoryChunkWithRefCount createValueAsUnserializedStoredObject(Object value) {
+     byte[] valueInByteArray;
+     if (value instanceof Long) {
+       valueInByteArray = convertValueToByteArray(value);
+     } else {
+       valueInByteArray = (byte[]) value;
+     }
+ 
+     boolean isSerialized = false;
+     boolean isCompressed = false;
+ 
+     MemoryChunkWithRefCount createdObject = createChunk(valueInByteArray, isSerialized, isCompressed);
+     return createdObject;
+   }
+ 
+   @Override
+   protected MemoryChunkWithRefCount createValueAsSerializedStoredObject(Object value) {
+     byte[] valueInSerializedByteArray = EntryEventImpl.serialize(value);
+ 
+     boolean isSerialized = true;
+     boolean isCompressed = false;
+ 
+     MemoryChunkWithRefCount createdObject = createChunk(valueInSerializedByteArray, isSerialized, isCompressed);
+     return createdObject;
+   }
+ 
 -  private GemFireChunk createChunk(byte[] v, boolean isSerialized, boolean isCompressed) {
 -    GemFireChunk chunk = (GemFireChunk) ma.allocateAndInitialize(v, isSerialized, isCompressed, GemFireChunk.TYPE);
++  private ObjectChunk createChunk(byte[] v, boolean isSerialized, boolean isCompressed) {
++    ObjectChunk chunk = (ObjectChunk) ma.allocateAndInitialize(v, isSerialized, isCompressed);
+     return chunk;
+   }
+ 
+   @Test
+   public void getHeapFormOfSerializedStoredObjectReturnsDeserializedObject() {
+     allocateOffHeapSerialized();
+     Object heapObject = OffHeapHelper.getHeapForm(storedObject);
+     assertThat("getHeapForm returns non-null object", heapObject, notNullValue());
+     assertThat("Heap and off heap objects are different objects", heapObject, is(not(storedObject)));
+     assertThat("Deserialzed values of offHeap object and returned object are equal", heapObject,
+         is(equalTo(deserializedRegionEntryValue)));
+   }
+ 
+   @Test
+   public void getHeapFormOfNonOffHeapObjectReturnsOriginal() {
+     Object testObject = getValue();
+     Object heapObject = OffHeapHelper.getHeapForm(testObject);
+     assertNotNull(heapObject);
+     assertSame(testObject, heapObject);
+   }
+ 
+   @Test
+   public void getHeapFormWithNullReturnsNull() {
+     Object testObject = null;
+     Object returnObject = OffHeapHelper.getHeapForm(testObject);
+     assertThat(returnObject, is(equalTo(null)));
+   }
+ 
+   @Test
+   public void copyAndReleaseWithNullReturnsNull() {
+     Object testObject = null;
+     Object returnObject = OffHeapHelper.copyAndReleaseIfNeeded(testObject);
+     assertThat(returnObject, is(equalTo(null)));
+   }
+ 
+   @Test
+   public void copyAndReleaseWithRetainedDeserializedObjectDecreasesRefCnt() {
+     allocateOffHeapDeserialized();
+     assertTrue(storedObject.retain());
+     assertThat("Retained chunk ref count", storedObject.getRefCount(), is(2));
+     OffHeapHelper.copyAndReleaseIfNeeded(storedObject);
+     assertThat("Chunk ref count decreases", storedObject.getRefCount(), is(1));
+   }
+ 
+   @Test
+   public void copyAndReleaseWithNonRetainedObjectDecreasesRefCnt() {
+     allocateOffHeapDeserialized();
+     // assertTrue(storedObject.retain());
+     assertThat("Retained chunk ref count", storedObject.getRefCount(), is(1));
+     OffHeapHelper.copyAndReleaseIfNeeded(storedObject);
+     assertThat("Chunk ref count decreases", storedObject.getRefCount(), is(0));
+   }
+ 
+   @Test
+   public void copyAndReleaseWithDeserializedReturnsValueOfOriginal() {
+     allocateOffHeapDeserialized();
+     assertTrue(storedObject.retain());
+     Object returnObject = OffHeapHelper.copyAndReleaseIfNeeded(storedObject);
+     assertThat(returnObject, is(equalTo(deserializedRegionEntryValue)));
+   }
+ 
+   @Test
+   public void copyAndReleaseWithSerializedReturnsValueOfOriginal() {
+     allocateOffHeapSerialized();
+     assertTrue(storedObject.retain());
+     Object returnObject = ((VMCachedDeserializable) OffHeapHelper.copyAndReleaseIfNeeded(storedObject))
+         .getSerializedValue();
+     assertThat(returnObject, is(equalTo(serializedRegionEntryValue)));
+   }
+ 
+   @Test
+   public void copyAndReleaseNonStoredObjectReturnsOriginal() {
+     Object testObject = getValue();
+     Object returnObject = OffHeapHelper.copyAndReleaseIfNeeded(testObject);
+     assertThat(returnObject, is(testObject));
+   }
+ 
+   @Test
+   public void copyIfNeededWithNullReturnsNull() {
+     Object testObject = null;
+     Object returnObject = OffHeapHelper.copyAndReleaseIfNeeded(testObject);
+     assertThat(returnObject, is(equalTo(null)));
+   }
+ 
+   @Test
+   public void copyIfNeededNonOffHeapReturnsOriginal() {
+     Object testObject = getValue();
+     Object returnObject = OffHeapHelper.copyIfNeeded(testObject);
+     assertThat(returnObject, is(testObject));
+   }
+ 
+   @Test
+   public void copyIfNeededOffHeapSerializedReturnsValueOfOriginal() {
+     allocateOffHeapSerialized();
+     Object returnObject = ((VMCachedDeserializable) OffHeapHelper.copyIfNeeded(storedObject)).getSerializedValue();
+     assertThat(returnObject, is(equalTo(serializedRegionEntryValue)));
+   }
+ 
+   @Test
+   public void copyIfNeededOffHeapDeserializedReturnsOriginal() {
+     allocateOffHeapDeserialized();
+     Object returnObject = OffHeapHelper.copyIfNeeded(storedObject);
+     assertThat(returnObject, is(equalTo(deserializedRegionEntryValue)));
+   }
+ 
+   @Test
+   public void copyIfNeededWithOffHeapDeserializedObjDoesNotRelease() {
+     allocateOffHeapDeserialized();
+     int initialRefCountOfObject = storedObject.getRefCount();
+     OffHeapHelper.copyIfNeeded(storedObject);
+     assertThat("Ref count after copy", storedObject.getRefCount(), is(initialRefCountOfObject));
+   }
+ 
+   @Test
+   public void copyIfNeededWithOffHeapSerializedObjDoesNotRelease() {
+     allocateOffHeapSerialized();
+     int initialRefCountOfObject = storedObject.getRefCount();
+     OffHeapHelper.copyIfNeeded(storedObject);
+     assertThat("Ref count after copy", storedObject.getRefCount(), is(initialRefCountOfObject));
+   }
+ 
+   @Test
+   public void releaseOfOffHeapDecrementsRefCount() {
+     allocateOffHeapSerialized();
+     assertThat("Initial Ref Count", storedObject.getRefCount(), is(1));
+     OffHeapHelper.release(storedObject);
+     assertThat("Ref Count Decremented", storedObject.getRefCount(), is(0));
+   }
+ 
+   @Test
+   public void releaseOfOffHeapReturnsTrue() {
+     allocateOffHeapSerialized();
+     assertThat("Releasing OFfHeap object is true", OffHeapHelper.release(storedObject), is(true));
+   }
+ 
+   @Test
+   public void releaseOfNonOffHeapReturnsFalse() {
+     Object testObject = getValue();
+     assertThat("Releasing OFfHeap object is true", OffHeapHelper.release(testObject), is(false));
+   }
+ 
+   @Test
+   public void releaseWithOutTrackingOfOffHeapDecrementsRefCount() {
+     allocateOffHeapSerialized();
+     assertThat("Initial Ref Count", storedObject.getRefCount(), is(1));
+     OffHeapHelper.releaseWithNoTracking(storedObject);
+     assertThat("Ref Count Decremented", storedObject.getRefCount(), is(0));
+   }
+ 
+   @Test
+   public void releaseWithoutTrackingOfOffHeapReturnsTrue() {
+     allocateOffHeapSerialized();
+     assertThat("Releasing OFfHeap object is true", OffHeapHelper.releaseWithNoTracking(storedObject), is(true));
+   }
+ 
+   @Test
+   public void releaseWithoutTrackingOfNonOffHeapReturnsFalse() {
+     Object testObject = getValue();
+     assertThat("Releasing OFfHeap object is true", OffHeapHelper.releaseWithNoTracking(testObject), is(false));
+   }
+ 
+   @Test
+   public void releaseAndTrackOwnerOfOffHeapDecrementsRefCount() {
+     allocateOffHeapSerialized();
+     assertThat("Initial Ref Count", storedObject.getRefCount(), is(1));
+     OffHeapHelper.releaseAndTrackOwner(storedObject, "owner");
+     assertThat("Ref Count Decremented", storedObject.getRefCount(), is(0));
+   }
+ 
+   @Test
+   public void releaseAndTrackOwnerOfOffHeapReturnsTrue() {
+     allocateOffHeapSerialized();
+     assertThat("Releasing OFfHeap object is true", OffHeapHelper.releaseAndTrackOwner(storedObject, "owner"), is(true));
+   }
+ 
+   @Test
+   public void releaseAndTrackOwnerOfNonOffHeapReturnsFalse() {
+     Object testObject = getValue();
+     assertThat("Releasing OFfHeap object is true", OffHeapHelper.releaseAndTrackOwner(testObject, "owner"), is(false));
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapRegionBase.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapRegionBase.java
index 0000000,b515959..8de0406
mode 000000,100644..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapRegionBase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapRegionBase.java
@@@ -1,0 -1,593 +1,593 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertNotNull;
+ import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
+ 
+ import java.io.Serializable;
+ import java.util.Arrays;
+ import java.util.Properties;
+ 
+ import org.junit.Test;
+ 
+ import com.gemstone.gemfire.OutOfOffHeapMemoryException;
+ import com.gemstone.gemfire.cache.CacheFactory;
+ import com.gemstone.gemfire.cache.EntryEvent;
+ import com.gemstone.gemfire.cache.Region;
+ import com.gemstone.gemfire.cache.RegionShortcut;
+ import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+ import com.gemstone.gemfire.compression.Compressor;
+ import com.gemstone.gemfire.compression.SnappyCompressor;
+ import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+ import com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier;
+ import com.gemstone.gemfire.internal.offheap.annotations.Released;
+ import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+ import com.gemstone.gemfire.pdx.PdxReader;
+ import com.gemstone.gemfire.pdx.PdxSerializable;
+ import com.gemstone.gemfire.pdx.PdxWriter;
+ import com.gemstone.gemfire.test.dunit.WaitCriterion;
+ 
+ /**
+  * Basic test of regions that use off heap storage.
+  * Subclasses exist for the different types of offheap store.
+  * 
+  * @author darrel
+  *
+  */
+ public abstract class OffHeapRegionBase {
+   
+   public abstract void configureOffHeapStorage();
+   public abstract void unconfigureOffHeapStorage();
+   public abstract int perObjectOverhead();
+ 
+   private GemFireCacheImpl createCache() {
+     return createCache(false);
+   }
+   private GemFireCacheImpl createCache(boolean isPersistent) {
+     configureOffHeapStorage();
+     Properties props = new Properties();
+     props.setProperty("locators", "");
+     props.setProperty("mcast-port", "0");
+     props.setProperty("off-heap-memory-size", getOffHeapMemorySize());
+     GemFireCacheImpl result = (GemFireCacheImpl) new CacheFactory(props).setPdxPersistent(isPersistent).create();
+     unconfigureOffHeapStorage();
+     return result;
+   }
+   private void closeCache(GemFireCacheImpl gfc, boolean keepOffHeapAllocated) {
+     gfc.close();
+     if (!keepOffHeapAllocated) {
+       SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+     }
+     // TODO cleanup default disk store files
+   }
+   
+   protected abstract String getOffHeapMemorySize();
+   
+   @Test
+   public void testSizeAllocation() {
+     // prevent cache from closing in reaction to ooom
+     System.setProperty(OffHeapStorage.STAY_CONNECTED_ON_OUTOFOFFHEAPMEMORY_PROPERTY, "true");
+     GemFireCacheImpl gfc = createCache();
+     try {
+       MemoryAllocator ma = gfc.getOffHeapStore();
+       assertNotNull(ma);
+       final long offHeapSize = ma.getFreeMemory();
+       assertEquals(0, ma.getUsedMemory());
 -      MemoryChunk mc1 = ma.allocate(64, null);
++      MemoryChunk mc1 = ma.allocate(64);
+       assertEquals(64+perObjectOverhead(), ma.getUsedMemory());
+       assertEquals(offHeapSize-(64+perObjectOverhead()), ma.getFreeMemory());
+       mc1.release();
+       assertEquals(offHeapSize, ma.getFreeMemory());
+       assertEquals(0, ma.getUsedMemory());
+       // do an allocation larger than the slab size
+       // TODO: currently the compact will product slabs bigger than the max slab size
+       // (see the todo comment on compact() in SimpleMemoryAllocator).
+       // So we request 20m here since that it the total size.
+       try {
 -        ma.allocate(1024*1024*20, null);
++        ma.allocate(1024*1024*20);
+         fail("Expected an out of heap exception");
+       } catch (OutOfOffHeapMemoryException expected) {
+       }
+       assertEquals(0, ma.getUsedMemory());
+       assertFalse(gfc.isClosed());
+     } finally {
+       System.clearProperty(OffHeapStorage.STAY_CONNECTED_ON_OUTOFOFFHEAPMEMORY_PROPERTY);
+       closeCache(gfc, false);
+     }
+   }
+   
+   public void keep_testOutOfOffHeapMemoryErrorClosesCache() {
+     // this test is redundant but may be useful
+     final GemFireCacheImpl gfc = createCache();
+     try {
+       MemoryAllocator ma = gfc.getOffHeapStore();
+       assertNotNull(ma);
+       final long offHeapSize = ma.getFreeMemory();
+       assertEquals(0, ma.getUsedMemory());
 -      MemoryChunk mc1 = ma.allocate(64, null);
++      MemoryChunk mc1 = ma.allocate(64);
+       assertEquals(64+perObjectOverhead(), ma.getUsedMemory());
+       assertEquals(offHeapSize-(64+perObjectOverhead()), ma.getFreeMemory());
+       mc1.release();
+       assertEquals(offHeapSize, ma.getFreeMemory());
+       assertEquals(0, ma.getUsedMemory());
+       // do an allocation larger than the slab size
+       try {
 -        ma.allocate(1024*1024*10, null);
++        ma.allocate(1024*1024*10);
+         fail("Expected an out of heap exception");
+       } catch (OutOfOffHeapMemoryException expected) {
+         // passed
+       }
+       assertEquals(0, ma.getUsedMemory());
+       
+       final WaitCriterion waitForDisconnect = new WaitCriterion() {
+         public boolean done() {
+           return gfc.isClosed();
+         }
+         public String description() {
+           return "Waiting for disconnect to complete";
+         }
+       };
+       com.gemstone.gemfire.test.dunit.Wait.waitForCriterion(waitForDisconnect, 10*1000, 100, true);
+ 
+       assertTrue(gfc.isClosed());
+     } finally {
+       closeCache(gfc, false);
+     }
+   }
+ 
+   @Test
+   public void testByteArrayAllocation() {
+     GemFireCacheImpl gfc = createCache();
+     try {
+       MemoryAllocator ma = gfc.getOffHeapStore();
+       assertNotNull(ma);
+       final long offHeapSize = ma.getFreeMemory();
+       assertEquals(0, ma.getUsedMemory());
+       byte[] data = new byte[] {1,2,3,4,5,6,7,8};
 -      MemoryChunk mc1 = (MemoryChunk)ma.allocateAndInitialize(data, false, false, null);
++      MemoryChunk mc1 = (MemoryChunk)ma.allocateAndInitialize(data, false, false);
+       assertEquals(data.length+perObjectOverhead(), ma.getUsedMemory());
+       assertEquals(offHeapSize-(data.length+perObjectOverhead()), ma.getFreeMemory());
+       byte[] data2 = new byte[data.length];
+       mc1.readBytes(0, data2);
+       assertTrue(Arrays.equals(data, data2));
+       mc1.release();
+       assertEquals(offHeapSize, ma.getFreeMemory());
+       assertEquals(0, ma.getUsedMemory());
+       // try some small byte[] that don't need to be stored off heap.
+       data = new byte[] {1,2,3,4,5,6,7};
 -      StoredObject so1 = ma.allocateAndInitialize(data, false, false, null);
++      StoredObject so1 = ma.allocateAndInitialize(data, false, false);
+       assertEquals(0, ma.getUsedMemory());
+       assertEquals(offHeapSize, ma.getFreeMemory());
+       data2 = new byte[data.length];
+       data2 = (byte[])so1.getDeserializedForReading();
+       assertTrue(Arrays.equals(data, data2));
+     } finally {
+       closeCache(gfc, false);
+     }
+   }
+ 
+   private void doRegionTest(final RegionShortcut rs, final String rName) {
+     doRegionTest(rs, rName, false/*compressed*/);
+   }
+ 
+   @SuppressWarnings({ "rawtypes", "unchecked" })
+   private void doRegionTest(final RegionShortcut rs, final String rName, boolean compressed) {
+     boolean isPersistent = rs == RegionShortcut.LOCAL_PERSISTENT || rs == RegionShortcut.REPLICATE_PERSISTENT || rs == RegionShortcut.PARTITION_PERSISTENT;
+     GemFireCacheImpl gfc = createCache(isPersistent);
+     Region r = null;
+     try {
+       gfc.setCopyOnRead(true);
+       final MemoryAllocator ma = gfc.getOffHeapStore();
+       assertNotNull(ma);
+       assertEquals(0, ma.getUsedMemory());
+       Compressor compressor = null;
+       if (compressed) {
+         compressor = SnappyCompressor.getDefaultInstance();
+       }
+       r = gfc.createRegionFactory(rs).setOffHeap(true).setCompressor(compressor).create(rName);
+       assertEquals(true, r.isEmpty());
+       assertEquals(0, ma.getUsedMemory());
+       Object data = new Integer(123456789);
+       r.put("key1", data);
+       //System.out.println("After put of Integer value off heap used memory=" + ma.getUsedMemory());
+       assertTrue(ma.getUsedMemory() == 0);
+       assertEquals(data, r.get("key1"));
+       r.invalidate("key1");
+       assertEquals(0, ma.getUsedMemory());
+       r.put("key1", data);
+       assertTrue(ma.getUsedMemory() == 0);
+       long usedBeforeUpdate = ma.getUsedMemory();
+       r.put("key1", data);
+       assertEquals(usedBeforeUpdate, ma.getUsedMemory());
+       assertEquals(data, r.get("key1"));
+       r.destroy("key1");
+       assertEquals(0, ma.getUsedMemory());
+       
+       data = new Long(0x007FFFFFL);
+       r.put("key1", data);
+       if (!compressed) {
+         assertTrue(ma.getUsedMemory() == 0);
+       }
+       assertEquals(data, r.get("key1"));
+       data = new Long(0xFF8000000L);
+       r.put("key1", data);
+       if (!compressed) {
+         assertTrue(ma.getUsedMemory() == 0);
+       }
+       assertEquals(data, r.get("key1"));
+       
+       
+       // now lets set data to something that will be stored offheap
+       data = new Long(Long.MAX_VALUE);
+       r.put("key1", data);
+       assertEquals(data, r.get("key1"));
+       //System.out.println("After put of Integer value off heap used memory=" + ma.getUsedMemory());
+       assertTrue(ma.getUsedMemory() > 0);
+       data = new Long(Long.MIN_VALUE);
+       r.put("key1", data);
+       assertEquals(data, r.get("key1"));
+       //System.out.println("After put of Integer value off heap used memory=" + ma.getUsedMemory());
+       assertTrue(ma.getUsedMemory() > 0);
+       r.invalidate("key1");
+       assertEquals(0, ma.getUsedMemory());
+       r.put("key1", data);
+       assertTrue(ma.getUsedMemory() > 0);
+       usedBeforeUpdate = ma.getUsedMemory();
+       r.put("key1", data);
+       assertEquals(usedBeforeUpdate, ma.getUsedMemory());
+       assertEquals(data, r.get("key1"));
+       r.destroy("key1");
+       assertEquals(0, ma.getUsedMemory());
+ 
+       // confirm that byte[] do use off heap
+       {
+         byte[] originalBytes = new byte[1024];
+         Object oldV = r.put("byteArray", originalBytes);
+         long startUsedMemory = ma.getUsedMemory();
+         assertEquals(null, oldV);
+         byte[] readBytes = (byte[]) r.get("byteArray");
+         if (originalBytes == readBytes) {
+           fail("Expected different byte[] identity");
+         }
+         if (!Arrays.equals(readBytes, originalBytes)) {
+           fail("Expected byte array contents to be equal");
+         }
+         assertEquals(startUsedMemory, ma.getUsedMemory());
+         oldV = r.put("byteArray", originalBytes);
+         if (!compressed) {
+           assertEquals(null, oldV); // we default to old value being null for offheap
+         }
+         assertEquals(startUsedMemory, ma.getUsedMemory());
+         
+         readBytes = (byte[])r.putIfAbsent("byteArray", originalBytes);
+         if (originalBytes == readBytes) {
+           fail("Expected different byte[] identity");
+         }
+         if (!Arrays.equals(readBytes, originalBytes)) {
+           fail("Expected byte array contents to be equal");
+         }
+         assertEquals(startUsedMemory, ma.getUsedMemory());
+         if (!r.replace("byteArray", readBytes, originalBytes)) {
+           fail("Expected replace to happen");
+         }
+         assertEquals(startUsedMemory, ma.getUsedMemory());
+         byte[] otherBytes = new byte[1024];
+         otherBytes[1023] = 1;
+         if (r.replace("byteArray", otherBytes, originalBytes)) {
+           fail("Expected replace to not happen");
+         }
+         if (r.replace("byteArray", "bogus string", originalBytes)) {
+           fail("Expected replace to not happen");
+         }
+         if (r.remove("byteArray", "bogus string")) {
+           fail("Expected remove to not happen");
+         }
+         assertEquals(startUsedMemory, ma.getUsedMemory());
+         
+         if (!r.remove("byteArray", originalBytes)) {
+           fail("Expected remove to happen");
+         }
+         assertEquals(0, ma.getUsedMemory());
+         oldV = r.putIfAbsent("byteArray", "string value");
+         assertEquals(null, oldV);
+         assertEquals("string value", r.get("byteArray"));
+         if (r.replace("byteArray", "string valuE", originalBytes)) {
+           fail("Expected replace to not happen");
+         }
+         if (!r.replace("byteArray", "string value", originalBytes)) {
+           fail("Expected replace to happen");
+         }
+         oldV = r.destroy("byteArray"); // we default to old value being null for offheap
+         if (!compressed) {
+           assertEquals(null, oldV);
+         }
+         MyCacheListener listener = new MyCacheListener();
+         r.getAttributesMutator().addCacheListener(listener);
+         try {
+           Object valueToReplace = "string value1";
+           r.put("byteArray", valueToReplace);
+           assertEquals(null, listener.ohOldValue);
+           if (!compressed) {
+             assertEquals("string value1", listener.ohNewValue.getDeserializedForReading());
+             valueToReplace = listener.ohNewValue;
+           }
+           if (!r.replace("byteArray", valueToReplace, "string value2")) {
+             fail("expected replace to happen");
+           }
+           if (!compressed) {
+             assertEquals("string value2", listener.ohNewValue.getDeserializedForReading());
+             assertEquals("string value1", listener.ohOldValue.getDeserializedForReading());
+           }
+           // make sure that a custom equals/hashCode are not used when comparing values.
+           
+         } finally {
+           r.getAttributesMutator().removeCacheListener(listener);
+         }
+       }
+       assertTrue(ma.getUsedMemory() > 0);
+       {
+         Object key = "MyValueWithPartialEquals";
+         MyValueWithPartialEquals v1 = new MyValueWithPartialEquals("s1");
+         MyValueWithPartialEquals v2 = new MyValueWithPartialEquals("s2");
+         MyValueWithPartialEquals v3 = new MyValueWithPartialEquals("s1");
+         r.put(key, v1);
+         try {
+           if (r.replace(key, v2, "should not happen")) {
+             fail("v1 should not be equal to v2 on an offheap region");
+           }
+           if (!r.replace(key, v3, "should happen")) {
+             fail("v1 should be equal to v3 on an offheap region");
+           }
+           r.put(key, v1);
+           if (r.remove(key, v2)) {
+             fail("v1 should not be equal to v2 on an offheap region");
+           }
+           if (!r.remove(key, v3)) {
+             fail("v1 should be equal to v3 on an offheap region");
+           }
+         } finally {
+           r.remove(key);
+         }
+       }
+       {
+         Object key = "MyPdxWithPartialEquals";
+         MyPdxWithPartialEquals v1 = new MyPdxWithPartialEquals("s", "1");
+         MyPdxWithPartialEquals v2 = new MyPdxWithPartialEquals("s", "2");
+         MyPdxWithPartialEquals v3 = new MyPdxWithPartialEquals("t", "1");
+         r.put(key, v1);
+         try {
+           if (r.replace(key, v3, "should not happen")) {
+             fail("v1 should not be equal to v3 on an offheap region");
+           }
+           if (!r.replace(key, v2, "should happen")) {
+             fail("v1 should be equal to v2 on an offheap region");
+           }
+           r.put(key, v1);
+           if (r.remove(key, v3)) {
+             fail("v1 should not be equal to v3 on an offheap region");
+           }
+           if (!r.remove(key, v2)) {
+             fail("v1 should be equal to v2 on an offheap region");
+           }
+         } finally {
+           r.remove(key);
+         }
+       }
+       byte[] value = new byte[1024];
+       /*while (value != null) */ {
+         r.put("byteArray", value);
+       }
+       r.remove("byteArray");
+       assertEquals(0, ma.getUsedMemory());
+ 
+       r.put("key1", data);
+       assertTrue(ma.getUsedMemory() > 0);
+       r.invalidateRegion();
+       assertEquals(0, ma.getUsedMemory());
+ 
+       r.put("key1", data);
+       assertTrue(ma.getUsedMemory() > 0);
+       try {
+         r.clear();
+         assertEquals(0, ma.getUsedMemory());
+       } catch (UnsupportedOperationException ok) {
+       }
+ 
+       r.put("key1", data);
+       assertTrue(ma.getUsedMemory() > 0);
+       if (r.getAttributes().getDataPolicy().withPersistence()) {
+         r.put("key2", Integer.valueOf(1234567890));
+         r.put("key3", new Long(0x007FFFFFL));
+         r.put("key4", new Long(0xFF8000000L));
+         assertEquals(4, r.size());
+         r.close();
+         assertEquals(0, ma.getUsedMemory());
+         // simple test of recovery
+         r = gfc.createRegionFactory(rs).setOffHeap(true).create(rName);
+         assertEquals(4, r.size());
+         assertEquals(data, r.get("key1"));
+         assertEquals(Integer.valueOf(1234567890), r.get("key2"));
+         assertEquals(new Long(0x007FFFFFL), r.get("key3"));
+         assertEquals(new Long(0xFF8000000L), r.get("key4"));
+         closeCache(gfc, true);
+         assertEquals(0, ma.getUsedMemory());
+         gfc = createCache();
+         if (ma != gfc.getOffHeapStore()) {
+           fail("identity of offHeapStore changed when cache was recreated");
+         }
+         r = gfc.createRegionFactory(rs).setOffHeap(true).create(rName);
+         assertTrue(ma.getUsedMemory() > 0);
+         assertEquals(4, r.size());
+         assertEquals(data, r.get("key1"));
+         assertEquals(Integer.valueOf(1234567890), r.get("key2"));
+         assertEquals(new Long(0x007FFFFFL), r.get("key3"));
+         assertEquals(new Long(0xFF8000000L), r.get("key4"));
+       }
+         
+       r.destroyRegion();
+       assertEquals(0, ma.getUsedMemory());
+ 
+     } finally {
+       if (r != null && !r.isDestroyed()) {
+         r.destroyRegion();
+       }
+       closeCache(gfc, false);
+     }
+     
+   }
+   
+   /**
+    * This class has an equals that does not compare all its bytes.
+    */
+   private static class MyValueWithPartialEquals implements Serializable {
+     private static final long serialVersionUID = 1L;
+     private final String value;
+     public MyValueWithPartialEquals(String v) {
+       this.value = v;
+     }
+     @Override public boolean equals(Object other) {
+       if (other instanceof MyValueWithPartialEquals) {
+         MyValueWithPartialEquals o = (MyValueWithPartialEquals) other;
+         // just compare the first char
+         return this.value.charAt(0) == o.value.charAt(0);
+       } else {
+         return false;
+       }
+     }
+   }
+   /**
+    * This class has an equals that does not compare all its bytes.
+    */
+   private static class MyPdxWithPartialEquals implements PdxSerializable {
+     private String base;
+     private String value;
+     public MyPdxWithPartialEquals(String b, String v) {
+       this.base = b;
+       this.value = v;
+     }
+     public MyPdxWithPartialEquals() {
+     }
+     @Override
+     public void toData(PdxWriter writer) {
+       writer.writeString("base", this.base);
+       writer.writeString("value", this.value);
+       writer.markIdentityField("base");
+     }
+     @Override
+     public void fromData(PdxReader reader) {
+       this.base = reader.readString("base");
+       this.value = reader.readString("value");
+     }
+   }
+   
+   @SuppressWarnings("rawtypes")
+   private static class MyCacheListener extends CacheListenerAdapter {
+     @Retained(OffHeapIdentifier.TEST_OFF_HEAP_REGION_BASE_LISTENER)
+     public StoredObject ohOldValue;
+     @Retained(OffHeapIdentifier.TEST_OFF_HEAP_REGION_BASE_LISTENER)
+     public StoredObject ohNewValue;
+     
+     /**
+      * This method retains both ohOldValue and ohNewValue
+      */
+     @Retained(OffHeapIdentifier.TEST_OFF_HEAP_REGION_BASE_LISTENER)
+     private void setEventData(EntryEvent e) {
+       close();
+       EntryEventImpl event = (EntryEventImpl) e;
+       this.ohOldValue = event.getOffHeapOldValue();
+       this.ohNewValue = event.getOffHeapNewValue();
+     }
+     
+     @Override
+     public void afterCreate(EntryEvent e) {
+       setEventData(e);
+     }
+ 
+     @Override
+     public void afterDestroy(EntryEvent e) {
+       setEventData(e);
+     }
+ 
+     @Override
+     public void afterInvalidate(EntryEvent e) {
+       setEventData(e);
+     }
+ 
+     @Override
+     public void afterUpdate(EntryEvent e) {
+       setEventData(e);
+     }
+     
+     @Released(OffHeapIdentifier.TEST_OFF_HEAP_REGION_BASE_LISTENER)
+     @Override
+     public void close() {
 -      if (this.ohOldValue instanceof Chunk) {
 -        ((Chunk)this.ohOldValue).release();
++      if (this.ohOldValue instanceof ObjectChunk) {
++        ((ObjectChunk)this.ohOldValue).release();
+       }
 -      if (this.ohNewValue instanceof Chunk) {
 -        ((Chunk)this.ohNewValue).release();
++      if (this.ohNewValue instanceof ObjectChunk) {
++        ((ObjectChunk)this.ohNewValue).release();
+       }
+     }
+   }
+   
+   @Test
+   public void testPR() {
+     doRegionTest(RegionShortcut.PARTITION, "pr1");
+   }
+   @Test
+   public void testPRCompressed() {
+     doRegionTest(RegionShortcut.PARTITION, "pr2", true);
+   }
+   @Test
+   public void testReplicate() {
+     doRegionTest(RegionShortcut.REPLICATE, "rep1");
+   }
+   @Test
+   public void testReplicateCompressed() {
+     doRegionTest(RegionShortcut.REPLICATE, "rep2", true);
+   }
+   @Test
+   public void testLocal() {
+     doRegionTest(RegionShortcut.LOCAL, "local1");
+   }
+   @Test
+   public void testLocalCompressed() {
+     doRegionTest(RegionShortcut.LOCAL, "local2", true);
+   }
+   @Test
+   public void testLocalPersistent() {
+     doRegionTest(RegionShortcut.LOCAL_PERSISTENT, "localPersist1");
+   }
+   @Test
+   public void testLocalPersistentCompressed() {
+     doRegionTest(RegionShortcut.LOCAL_PERSISTENT, "localPersist2", true);
+   }
+   @Test
+   public void testPRPersistent() {
+     doRegionTest(RegionShortcut.PARTITION_PERSISTENT, "prPersist1");
+   }
+   @Test
+   public void testPRPersistentCompressed() {
+     doRegionTest(RegionShortcut.PARTITION_PERSISTENT, "prPersist2", true);
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapRegionEntryHelperJUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapRegionEntryHelperJUnitTest.java
index 0000000,b800977..5d53109
mode 000000,100644..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapRegionEntryHelperJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapRegionEntryHelperJUnitTest.java
@@@ -1,0 -1,870 +1,870 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import static org.assertj.core.api.Assertions.assertThat;
+ import static org.mockito.Mockito.mock;
+ import static org.mockito.Mockito.never;
+ import static org.mockito.Mockito.reset;
+ import static org.mockito.Mockito.times;
+ import static org.mockito.Mockito.verify;
+ import static org.mockito.Mockito.when;
+ 
+ import java.nio.ByteBuffer;
+ 
+ import org.junit.After;
+ import org.junit.Before;
+ import org.junit.Test;
+ import org.junit.experimental.categories.Category;
+ import org.junit.runner.RunWith;
+ import org.mockito.Mockito;
+ import org.powermock.api.mockito.PowerMockito;
+ import org.powermock.core.classloader.annotations.PowerMockIgnore;
+ import org.powermock.core.classloader.annotations.PrepareForTest;
+ import org.powermock.modules.junit4.PowerMockRunner;
+ 
+ import com.gemstone.gemfire.LogWriter;
+ import com.gemstone.gemfire.compression.Compressor;
+ import com.gemstone.gemfire.internal.DSCODE;
+ import com.gemstone.gemfire.internal.cache.CachePerfStats;
+ import com.gemstone.gemfire.internal.cache.DiskEntry;
+ import com.gemstone.gemfire.internal.cache.DiskId;
+ import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+ import com.gemstone.gemfire.internal.cache.OffHeapRegionEntry;
+ import com.gemstone.gemfire.internal.cache.RegionEntryContext;
+ import com.gemstone.gemfire.internal.cache.Token;
+ import com.gemstone.gemfire.internal.cache.VMCachedDeserializable;
+ import com.gemstone.gemfire.internal.cache.VersionedStatsDiskRegionEntryOffHeap;
+ import com.gemstone.gemfire.test.junit.categories.UnitTest;
+ 
+ @Category(UnitTest.class)
+ @RunWith(PowerMockRunner.class)
+ @PowerMockIgnore("*.UnitTest")
 -@PrepareForTest({ Chunk.class, OffHeapRegionEntryHelper.class })
++@PrepareForTest({ ObjectChunk.class, OffHeapRegionEntryHelper.class })
+ public class OffHeapRegionEntryHelperJUnitTest {
+ 
+   private static final Long VALUE_IS_NOT_ENCODABLE = 0L;
+ 
+   private MemoryAllocator ma;
+ 
+   @Before
+   public void setUp() {
+     OutOfOffHeapMemoryListener ooohml = mock(OutOfOffHeapMemoryListener.class);
+     OffHeapMemoryStats stats = mock(OffHeapMemoryStats.class);
+     LogWriter lw = mock(LogWriter.class);
+ 
+     ma = SimpleMemoryAllocatorImpl.create(ooohml, stats, lw, 1, OffHeapStorage.MIN_SLAB_SIZE * 1, OffHeapStorage.MIN_SLAB_SIZE);
+   }
+ 
+   @After
+   public void tearDown() {
+     SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+   }
+ 
 -  private GemFireChunk createChunk(Object value) {
++  private ObjectChunk createChunk(Object value) {
+     byte[] v = EntryEventImpl.serialize(value);
+ 
+     boolean isSerialized = true;
+     boolean isCompressed = false;
+ 
 -    GemFireChunk chunk = (GemFireChunk) ma.allocateAndInitialize(v, isSerialized, isCompressed, GemFireChunk.TYPE);
++    ObjectChunk chunk = (ObjectChunk) ma.allocateAndInitialize(v, isSerialized, isCompressed);
+ 
+     return chunk;
+   }
+ 
+   @Test
+   public void encodeDataAsAddressShouldReturnZeroIfValueIsGreaterThanSevenBytes() {
+     Long value = Long.MAX_VALUE;
+ 
+     byte[] valueInBytes = ByteBuffer.allocate(Long.SIZE / Byte.SIZE).putLong(value).array();
+     boolean isSerialized = false;
+     boolean isCompressed = false;
+ 
+     assertThat(valueInBytes.length).isGreaterThanOrEqualTo(OffHeapRegionEntryHelper.MAX_LENGTH_FOR_DATA_AS_ADDRESS);
+ 
+     long encodedAddress = OffHeapRegionEntryHelper.encodeDataAsAddress(valueInBytes, isSerialized, isCompressed);
+ 
+     assertThat(encodedAddress).isEqualTo(VALUE_IS_NOT_ENCODABLE);
+   }
+ 
+   @Test
+   public void encodeDataAsAddressShouldEncodeLongIfItsSerializedAndIfItsNotTooBig() {
+     Long value = 0L;
+     long expectedEncodedAddress = 123L;
+ 
+     byte[] valueInBytes = EntryEventImpl.serialize(value);
+     boolean isSerialized = true;
+     boolean isCompressed = false;
+ 
+     long encodedAddress = OffHeapRegionEntryHelper.encodeDataAsAddress(valueInBytes, isSerialized, isCompressed);
+ 
+     assertThat(encodedAddress).isEqualTo(expectedEncodedAddress);
+     assertSerializedAndCompressedBits(encodedAddress, isSerialized, isCompressed);
+   }
+ 
+   @Test
+   public void encodeDataAsAddressShouldReturnZeroIfValueIsLongAndItIsSerializedAndBig() {
+     Long value = Long.MAX_VALUE;
+ 
+     byte[] valueInBytes = EntryEventImpl.serialize(value);
+     boolean isSerialized = true;
+     boolean isCompressed = false;
+ 
+     long encodedAddress = OffHeapRegionEntryHelper.encodeDataAsAddress(valueInBytes, isSerialized, isCompressed);
+ 
+     assertThat(encodedAddress).isEqualTo(VALUE_IS_NOT_ENCODABLE);
+   }
+ 
+   @Test
+   public void encodeDataAsAddressShouldReturnZeroIfValueIsLargerThanEightBytesAndNotLong() {
+     byte[] someValue = new byte[8];
+     someValue[0] = DSCODE.CLASS;
+ 
+     boolean isSerialized = true;
+     boolean isCompressed = false;
+ 
+     long encodedAddress = OffHeapRegionEntryHelper.encodeDataAsAddress(someValue, isSerialized, isCompressed);
+ 
+     assertThat(encodedAddress).isEqualTo(VALUE_IS_NOT_ENCODABLE);
+   }
+ 
+   @Test
+   public void encodeDataAsAddressShouldReturnValidAddressIfValueIsLesserThanSevenBytes() {
+     long expectedAddress = 549755813697L;
+ 
+     Integer value = Integer.MAX_VALUE;
+ 
+     byte[] valueInBytes = ByteBuffer.allocate(Integer.SIZE / Byte.SIZE).putInt(value).array();
+     boolean isSerialized = false;
+     boolean isCompressed = false;
+ 
+     long encodedAddress = OffHeapRegionEntryHelper.encodeDataAsAddress(valueInBytes, isSerialized, isCompressed);
+ 
+     assertThat(encodedAddress).isEqualTo(expectedAddress);
+     assertSerializedAndCompressedBits(encodedAddress, isSerialized, isCompressed);
+   }
+ 
+   @Test
+   public void encodeDataAsAssressShouldSetSerialziedBitIfSerizliaed() {
+     long expectedAddress = 63221918596947L;
+ 
+     Integer value = Integer.MAX_VALUE;
+ 
+     byte[] valueInBytes = EntryEventImpl.serialize(value);
+     boolean isSerialized = true;
+     boolean isCompressed = false;
+ 
+     long encodedAddress = OffHeapRegionEntryHelper.encodeDataAsAddress(valueInBytes, isSerialized, isCompressed);
+ 
+     assertThat(expectedAddress).isEqualTo(encodedAddress);
+     assertSerializedAndCompressedBits(encodedAddress, isSerialized, isCompressed);
+   }
+ 
+   @Test
+   public void encodeDataAsAssressShouldSetSerialziedBitIfCompressed() {
+     long expectedAddress = 549755813701L;
+ 
+     Integer value = Integer.MAX_VALUE;
+ 
+     byte[] valueInBytes = ByteBuffer.allocate(Integer.SIZE / Byte.SIZE).putInt(value).array();
+     boolean isSerialized = false;
+     boolean isCompressed = true;
+ 
+     long encodedAddress = OffHeapRegionEntryHelper.encodeDataAsAddress(valueInBytes, isSerialized, isCompressed);
+ 
+     assertThat(encodedAddress).isEqualTo(expectedAddress);
+     assertSerializedAndCompressedBits(encodedAddress, isSerialized, isCompressed);
+   }
+ 
+   @Test
+   public void encodeDataAsAssressShouldSetBothSerialziedAndCompressedBitsIfSerializedAndCompressed() {
+     long expectedAddress = 63221918596951L;
+ 
+     Integer value = Integer.MAX_VALUE;
+ 
+     byte[] valueInBytes = EntryEventImpl.serialize(value);
+     boolean isSerialized = true;
+     boolean isCompressed = true;
+ 
+     long encodedAddress = OffHeapRegionEntryHelper.encodeDataAsAddress(valueInBytes, isSerialized, isCompressed);
+ 
+     assertThat(expectedAddress).isEqualTo(encodedAddress);
+     assertSerializedAndCompressedBits(encodedAddress, isSerialized, isCompressed);
+   }
+ 
+   private void assertSerializedAndCompressedBits(long encodedAddress, boolean shouldSerializedBitBeSet, boolean shouldCompressedBitBeSet) {
+     boolean isSerializedBitSet = (encodedAddress & OffHeapRegionEntryHelper.SERIALIZED_BIT) == OffHeapRegionEntryHelper.SERIALIZED_BIT ? true : false;
+     boolean isCompressedBitSet = (encodedAddress & OffHeapRegionEntryHelper.COMPRESSED_BIT) == OffHeapRegionEntryHelper.COMPRESSED_BIT ? true : false;
+ 
+     assertThat(isSerializedBitSet).isEqualTo(shouldSerializedBitBeSet);
+     assertThat(isCompressedBitSet).isEqualTo(shouldCompressedBitBeSet);
+   }
+ 
+   @Test
+   public void decodeAddressToBytesShouldReturnActualBytes() {
+     long encodedAddress = 549755813697L;
+     Integer value = Integer.MAX_VALUE;
+ 
+     byte[] actual = OffHeapRegionEntryHelper.decodeAddressToBytes(encodedAddress, false, false);
+     byte[] expectedValue = ByteBuffer.allocate(Integer.SIZE / Byte.SIZE).putInt(value).array();
+ 
+     assertThat(actual).isEqualTo(expectedValue);
+   }
+ 
+   @Test
+   public void decodeDataAsAddressShouldDecodeLongIfItsSerializedAndIfItsNotTooBig() {
+     Long value = 0L;
+     long encodedAddress = 123L;
+ 
+     byte[] actual = OffHeapRegionEntryHelper.decodeAddressToBytes(encodedAddress, false, false);
+     byte[] expectedValue = EntryEventImpl.serialize(value);
+ 
+     assertThat(actual).isEqualTo(expectedValue);
+   }
+ 
+   @Test(expected = UnsupportedOperationException.class)
+   public void decodeDataAsAddressShouldThrowExceptionIfDataIsCompressedAndItsNotOkToBeCompressed() {
+     long encodedAddress = 549755813703L;
+     OffHeapRegionEntryHelper.decodeAddressToBytes(encodedAddress, true, false);
+   }
+ 
+   @Test
+   public void encodedAddressShouldBeDecodableEvenIfValueIsSerialized() {
+     Integer value = Integer.MAX_VALUE;
+ 
+     byte[] serializedValue = EntryEventImpl.serialize(value);
+     boolean isSerialized = true;
+     boolean isCompressed = false;
+ 
+     long encodedAddress = OffHeapRegionEntryHelper.encodeDataAsAddress(serializedValue, isSerialized, isCompressed);
+ 
+     Integer actualValue = (Integer) OffHeapRegionEntryHelper.decodeAddressToObject(encodedAddress);
+ 
+     assertThat(actualValue).isEqualTo(value);
+   }
+ 
+   @Test
+   public void encodedAddressShouldBeDecodableEvenIfValueIsUnserialized() {
+     Integer value = Integer.MAX_VALUE;
+ 
+     byte[] unSerializedValue = ByteBuffer.allocate(Integer.SIZE / Byte.SIZE).putInt(value).array();
+     boolean isSerialized = false;
+     boolean isCompressed = false;
+ 
+     long encodedAddress = OffHeapRegionEntryHelper.encodeDataAsAddress(unSerializedValue, isSerialized, isCompressed);
+ 
+     byte[] actualValue = (byte[]) OffHeapRegionEntryHelper.decodeAddressToObject(encodedAddress);
+ 
+     assertThat(actualValue).isEqualTo(unSerializedValue);
+   }
+ 
+   @Test
+   public void isSerializedShouldReturnTrueIfSerialized() {
+     assertThat(OffHeapRegionEntryHelper.isSerialized(1000010L)).isTrue();
+   }
+ 
+   @Test
+   public void isSerializedShouldReturnFalseIfNotSerialized() {
+     assertThat(OffHeapRegionEntryHelper.isSerialized(1000000L)).isFalse();
+   }
+ 
+   @Test
+   public void isCompressedShouldReturnTrueIfCompressed() {
+     assertThat(OffHeapRegionEntryHelper.isCompressed(1000100L)).isTrue();
+   }
+ 
+   @Test
+   public void isCompressedShouldReturnFalseIfNotCompressed() {
+     assertThat(OffHeapRegionEntryHelper.isCompressed(1000000L)).isFalse();
+   }
+ 
+   @Test
+   public void isOffHeapShouldReturnTrueIfAddressIsOnOffHeap() {
 -    Chunk value = createChunk(Long.MAX_VALUE);
++    ObjectChunk value = createChunk(Long.MAX_VALUE);
+     assertThat(OffHeapRegionEntryHelper.isOffHeap(value.getMemoryAddress())).isTrue();
+   }
+ 
+   @Test
+   public void isOffHeapShouldReturnFalseIfAddressIsAnEncodedAddress() {
+     byte[] data = ByteBuffer.allocate(Integer.SIZE / Byte.SIZE).putInt((Integer) Integer.MAX_VALUE).array();
+     long address = OffHeapRegionEntryHelper.encodeDataAsAddress(data, false, false);
+     assertThat(OffHeapRegionEntryHelper.isOffHeap(address)).isFalse();
+   }
+ 
+   @Test
+   public void isOffHeapShouldReturnFalseForAnyTokenAddress() {
+     assertThat(OffHeapRegionEntryHelper.isOffHeap(OffHeapRegionEntryHelper.NULL_ADDRESS)).isFalse();
+     assertThat(OffHeapRegionEntryHelper.isOffHeap(OffHeapRegionEntryHelper.INVALID_ADDRESS)).isFalse();
+     assertThat(OffHeapRegionEntryHelper.isOffHeap(OffHeapRegionEntryHelper.LOCAL_INVALID_ADDRESS)).isFalse();
+     assertThat(OffHeapRegionEntryHelper.isOffHeap(OffHeapRegionEntryHelper.DESTROYED_ADDRESS)).isFalse();
+     assertThat(OffHeapRegionEntryHelper.isOffHeap(OffHeapRegionEntryHelper.REMOVED_PHASE1_ADDRESS)).isFalse();
+     assertThat(OffHeapRegionEntryHelper.isOffHeap(OffHeapRegionEntryHelper.REMOVED_PHASE2_ADDRESS)).isFalse();
+     assertThat(OffHeapRegionEntryHelper.isOffHeap(OffHeapRegionEntryHelper.END_OF_STREAM_ADDRESS)).isFalse();
+     assertThat(OffHeapRegionEntryHelper.isOffHeap(OffHeapRegionEntryHelper.NOT_AVAILABLE_ADDRESS)).isFalse();
+     assertThat(OffHeapRegionEntryHelper.isOffHeap(OffHeapRegionEntryHelper.TOMBSTONE_ADDRESS)).isFalse();
+   }
+ 
+   @Test
+   public void setValueShouldChangeTheRegionEntryAddressToNewAddress() {
+     // mock region entry
+     OffHeapRegionEntry re = mock(OffHeapRegionEntry.class);
+ 
+     // some old address
+     long oldAddress = 1L;
+ 
+     // testing when the newValue is a chunk
 -    Chunk newValue = createChunk(Long.MAX_VALUE);
++    ObjectChunk newValue = createChunk(Long.MAX_VALUE);
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(oldAddress);
+     when(re.setAddress(oldAddress, newValue.getMemoryAddress())).thenReturn(Boolean.TRUE);
+ 
+     // invoke the method under test
+     OffHeapRegionEntryHelper.setValue(re, newValue);
+ 
+     // verify oldAddress is replaced with newAddress
+     verify(re, times(1)).setAddress(oldAddress, newValue.getMemoryAddress());
+     // resetting the spy in-order to re-use
+     reset(re);
+ 
+     // testing when the newValue is DataAsAddress
+     DataAsAddress newAddress1 = new DataAsAddress(2L);
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(oldAddress);
+     when(re.setAddress(oldAddress, newAddress1.getEncodedAddress())).thenReturn(Boolean.TRUE);
+     OffHeapRegionEntryHelper.setValue(re, newAddress1);
+ 
+     // verify oldAddress is replaced with newAddress
+     verify(re, times(1)).setAddress(oldAddress, newAddress1.getEncodedAddress());
+     reset(re);
+ 
+     // Testing when newValue is Token Objects
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(oldAddress);
+     when(re.setAddress(oldAddress, OffHeapRegionEntryHelper.NULL_ADDRESS)).thenReturn(Boolean.TRUE);
+     OffHeapRegionEntryHelper.setValue(re, null);
+ 
+     // verify oldAddress is replaced with newAddress
+     verify(re, times(1)).setAddress(oldAddress, OffHeapRegionEntryHelper.NULL_ADDRESS);
+     reset(re);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(oldAddress);
+     when(re.setAddress(oldAddress, OffHeapRegionEntryHelper.INVALID_ADDRESS)).thenReturn(Boolean.TRUE);
+     OffHeapRegionEntryHelper.setValue(re, Token.INVALID);
+ 
+     // verify oldAddress is replaced with newAddress
+     verify(re, times(1)).setAddress(oldAddress, OffHeapRegionEntryHelper.INVALID_ADDRESS);
+     reset(re);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(oldAddress);
+     when(re.setAddress(oldAddress, OffHeapRegionEntryHelper.LOCAL_INVALID_ADDRESS)).thenReturn(Boolean.TRUE);
+     OffHeapRegionEntryHelper.setValue(re, Token.LOCAL_INVALID);
+ 
+     // verify oldAddress is replaced with newAddress
+     verify(re, times(1)).setAddress(oldAddress, OffHeapRegionEntryHelper.LOCAL_INVALID_ADDRESS);
+     reset(re);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(oldAddress);
+     when(re.setAddress(oldAddress, OffHeapRegionEntryHelper.DESTROYED_ADDRESS)).thenReturn(Boolean.TRUE);
+     OffHeapRegionEntryHelper.setValue(re, Token.DESTROYED);
+ 
+     // verify oldAddress is replaced with newAddress
+     verify(re, times(1)).setAddress(oldAddress, OffHeapRegionEntryHelper.DESTROYED_ADDRESS);
+     reset(re);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(oldAddress);
+     when(re.setAddress(oldAddress, OffHeapRegionEntryHelper.REMOVED_PHASE1_ADDRESS)).thenReturn(Boolean.TRUE);
+     OffHeapRegionEntryHelper.setValue(re, Token.REMOVED_PHASE1);
+ 
+     // verify oldAddress is replaced with newAddress
+     verify(re, times(1)).setAddress(oldAddress, OffHeapRegionEntryHelper.REMOVED_PHASE1_ADDRESS);
+     reset(re);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(oldAddress);
+     when(re.setAddress(oldAddress, OffHeapRegionEntryHelper.REMOVED_PHASE2_ADDRESS)).thenReturn(Boolean.TRUE);
+     OffHeapRegionEntryHelper.setValue(re, Token.REMOVED_PHASE2);
+ 
+     // verify oldAddress is replaced with newAddress
+     verify(re, times(1)).setAddress(oldAddress, OffHeapRegionEntryHelper.REMOVED_PHASE2_ADDRESS);
+     reset(re);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(oldAddress);
+     when(re.setAddress(oldAddress, OffHeapRegionEntryHelper.END_OF_STREAM_ADDRESS)).thenReturn(Boolean.TRUE);
+     OffHeapRegionEntryHelper.setValue(re, Token.END_OF_STREAM);
+ 
+     // verify oldAddress is replaced with newAddress
+     verify(re, times(1)).setAddress(oldAddress, OffHeapRegionEntryHelper.END_OF_STREAM_ADDRESS);
+     reset(re);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(oldAddress);
+     when(re.setAddress(oldAddress, OffHeapRegionEntryHelper.NOT_AVAILABLE_ADDRESS)).thenReturn(Boolean.TRUE);
+     OffHeapRegionEntryHelper.setValue(re, Token.NOT_AVAILABLE);
+ 
+     // verify oldAddress is replaced with newAddress
+     verify(re, times(1)).setAddress(oldAddress, OffHeapRegionEntryHelper.NOT_AVAILABLE_ADDRESS);
+     reset(re);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(oldAddress);
+     when(re.setAddress(oldAddress, OffHeapRegionEntryHelper.TOMBSTONE_ADDRESS)).thenReturn(Boolean.TRUE);
+     OffHeapRegionEntryHelper.setValue(re, Token.TOMBSTONE);
+ 
+     // verify oldAddress is replaced with newAddress
+     verify(re, times(1)).setAddress(oldAddress, OffHeapRegionEntryHelper.TOMBSTONE_ADDRESS);
+     reset(re);
+   }
+ 
+   @Test
+   public void setValueShouldChangeTheRegionEntryAddressToNewAddressAndReleaseOldValueIfItsOnOffHeap() {
+     // mock region entry
+     OffHeapRegionEntry re = mock(OffHeapRegionEntry.class);
+ 
 -    Chunk oldValue = createChunk(Long.MAX_VALUE);
 -    Chunk newValue = createChunk(Long.MAX_VALUE - 1);
++    ObjectChunk oldValue = createChunk(Long.MAX_VALUE);
++    ObjectChunk newValue = createChunk(Long.MAX_VALUE - 1);
+ 
+     // mock Chunk static methods - in-order to verify that release is called
 -    PowerMockito.spy(Chunk.class);
 -    PowerMockito.doNothing().when(Chunk.class);
 -    Chunk.release(oldValue.getMemoryAddress(), true);
++    PowerMockito.spy(ObjectChunk.class);
++    PowerMockito.doNothing().when(ObjectChunk.class);
++    ObjectChunk.release(oldValue.getMemoryAddress());
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(oldValue.getMemoryAddress());
+     when(re.setAddress(oldValue.getMemoryAddress(), newValue.getMemoryAddress())).thenReturn(Boolean.TRUE);
+ 
+     // invoke the method under test
+     OffHeapRegionEntryHelper.setValue(re, newValue);
+ 
+     // verify oldAddress is changed to newAddress
+     verify(re, times(1)).setAddress(oldValue.getMemoryAddress(), newValue.getMemoryAddress());
+ 
+     // verify oldAddress is released
+     PowerMockito.verifyStatic();
 -    Chunk.release(oldValue.getMemoryAddress(), true);
++    ObjectChunk.release(oldValue.getMemoryAddress());
+   }
+ 
+   @Test
+   public void setValueShouldChangeTheRegionEntryAddressToNewAddressAndDoesNothingIfOldAddressIsAnEncodedAddress() {
+     // mock region entry
+     OffHeapRegionEntry re = mock(OffHeapRegionEntry.class);
+ 
+     byte[] oldData = ByteBuffer.allocate(Integer.SIZE / Byte.SIZE).putInt((Integer) Integer.MAX_VALUE).array();
+     long oldAddress = OffHeapRegionEntryHelper.encodeDataAsAddress(oldData, false, false);
+ 
+     byte[] newData = ByteBuffer.allocate(Integer.SIZE / Byte.SIZE).putInt((Integer) Integer.MAX_VALUE - 1).array();
+     DataAsAddress newAddress = new DataAsAddress(OffHeapRegionEntryHelper.encodeDataAsAddress(newData, false, false));
+ 
+     // mock Chunk static methods - in-order to verify that release is never called
 -    PowerMockito.spy(Chunk.class);
 -    PowerMockito.doNothing().when(Chunk.class);
 -    Chunk.release(oldAddress, true);
++    PowerMockito.spy(ObjectChunk.class);
++    PowerMockito.doNothing().when(ObjectChunk.class);
++    ObjectChunk.release(oldAddress);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(oldAddress);
+     when(re.setAddress(oldAddress, newAddress.getEncodedAddress())).thenReturn(Boolean.TRUE);
+ 
+     // invoke the method under test
+     OffHeapRegionEntryHelper.setValue(re, newAddress);
+ 
+     // verify oldAddress is changed to newAddress
+     verify(re, times(1)).setAddress(oldAddress, newAddress.getEncodedAddress());
+ 
+     // verify that release is never called as the old address is not on offheap
+     PowerMockito.verifyStatic(never());
 -    Chunk.release(oldAddress, true);
++    ObjectChunk.release(oldAddress);
+   }
+ 
+   @Test
+   public void setValueShouldChangeTheRegionEntryAddressToNewAddressAndDoesNothingIfOldAddressIsATokenAddress() {
+     // mock region entry
+     OffHeapRegionEntry re = mock(OffHeapRegionEntry.class);
+ 
+     long oldAddress = OffHeapRegionEntryHelper.REMOVED_PHASE1_ADDRESS;
+ 
+     Token newValue = Token.REMOVED_PHASE2;
+     long newAddress = OffHeapRegionEntryHelper.REMOVED_PHASE2_ADDRESS;
+ 
+     // mock Chunk static methods - in-order to verify that release is never called
 -    PowerMockito.spy(Chunk.class);
 -    PowerMockito.doNothing().when(Chunk.class);
 -    Chunk.release(oldAddress, true);
++    PowerMockito.spy(ObjectChunk.class);
++    PowerMockito.doNothing().when(ObjectChunk.class);
++    ObjectChunk.release(oldAddress);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(oldAddress);
+     when(re.setAddress(oldAddress, newAddress)).thenReturn(Boolean.TRUE);
+ 
+     // invoke the method under test
+     OffHeapRegionEntryHelper.setValue(re, newValue);
+ 
+     // verify oldAddress is changed to newAddress
+     verify(re, times(1)).setAddress(oldAddress, newAddress);
+ 
+     // verify that release is never called as the old address is not on offheap
+     PowerMockito.verifyStatic(never());
 -    Chunk.release(oldAddress, true);
++    ObjectChunk.release(oldAddress);
+   }
+ 
+   @Test(expected = IllegalStateException.class)
+   public void setValueShouldThrowIllegalExceptionIfNewValueCannotBeConvertedToAddress() {
+     // mock region entry
+     OffHeapRegionEntry re = mock(OffHeapRegionEntry.class);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(1L);
+ 
+     // invoke the method under test with some object other than Chunk/DataAsAddress/Token
+     OffHeapRegionEntryHelper.setValue(re, new Object());
+   }
+ 
+   @Test
+   public void getValueAsTokenShouldReturnNotATokenIfValueIsOnOffHeap() {
+     // mock region entry
+     OffHeapRegionEntry re = mock(OffHeapRegionEntry.class);
+ 
 -    Chunk chunk = createChunk(Long.MAX_VALUE);
++    ObjectChunk chunk = createChunk(Long.MAX_VALUE);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(chunk.getMemoryAddress());
+     Token token = OffHeapRegionEntryHelper.getValueAsToken(re);
+ 
+     assertThat(token).isEqualTo(Token.NOT_A_TOKEN);
+   }
+ 
+   @Test
+   public void getValueAsTokenShouldReturnNotATokenIfValueIsEncoded() {
+     // mock region entry
+     OffHeapRegionEntry re = mock(OffHeapRegionEntry.class);
+ 
+     byte[] data = ByteBuffer.allocate(Integer.SIZE / Byte.SIZE).putInt(Integer.MAX_VALUE).array();
+     long address = OffHeapRegionEntryHelper.encodeDataAsAddress(data, false, false);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(address);
+     Token token = OffHeapRegionEntryHelper.getValueAsToken(re);
+ 
+     assertThat(token).isEqualTo(Token.NOT_A_TOKEN);
+   }
+ 
+   @Test
+   public void getValueAsTokenShouldReturnAValidToken() {
+     // mock region entry
+     OffHeapRegionEntry re = mock(OffHeapRegionEntry.class);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(OffHeapRegionEntryHelper.NULL_ADDRESS);
+     Token token = OffHeapRegionEntryHelper.getValueAsToken(re);
+ 
+     assertThat(token).isNull();
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(OffHeapRegionEntryHelper.INVALID_ADDRESS);
+     token = OffHeapRegionEntryHelper.getValueAsToken(re);
+ 
+     assertThat(token).isEqualTo(Token.INVALID);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(OffHeapRegionEntryHelper.LOCAL_INVALID_ADDRESS);
+     token = OffHeapRegionEntryHelper.getValueAsToken(re);
+ 
+     assertThat(token).isEqualTo(Token.LOCAL_INVALID);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(OffHeapRegionEntryHelper.DESTROYED_ADDRESS);
+     token = OffHeapRegionEntryHelper.getValueAsToken(re);
+ 
+     assertThat(token).isEqualTo(Token.DESTROYED);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(OffHeapRegionEntryHelper.REMOVED_PHASE1_ADDRESS);
+     token = OffHeapRegionEntryHelper.getValueAsToken(re);
+ 
+     assertThat(token).isEqualTo(Token.REMOVED_PHASE1);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(OffHeapRegionEntryHelper.REMOVED_PHASE2_ADDRESS);
+     token = OffHeapRegionEntryHelper.getValueAsToken(re);
+ 
+     assertThat(token).isEqualTo(Token.REMOVED_PHASE2);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(OffHeapRegionEntryHelper.END_OF_STREAM_ADDRESS);
+     token = OffHeapRegionEntryHelper.getValueAsToken(re);
+ 
+     assertThat(token).isEqualTo(Token.END_OF_STREAM);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(OffHeapRegionEntryHelper.NOT_AVAILABLE_ADDRESS);
+     token = OffHeapRegionEntryHelper.getValueAsToken(re);
+ 
+     assertThat(token).isEqualTo(Token.NOT_AVAILABLE);
+ 
+     // mock region entry methods required for test
+     when(re.getAddress()).thenReturn(OffHeapRegionEntryHelper.TOMBSTONE_ADDRESS);
+     token = OffHeapRegionEntryHelper.getValueAsToken(re);
+ 
+     assertThat(token).isEqualTo(Token.TOMBSTONE);
+   }
+ 
+   @Test
+   public void addressToObjectShouldReturnValueFromChunk() {
 -    Chunk expected = createChunk(Long.MAX_VALUE);
++    ObjectChunk expected = createChunk(Long.MAX_VALUE);
+     Object actual = OffHeapRegionEntryHelper.addressToObject(expected.getMemoryAddress(), false, null);
+ 
 -    assertThat(actual).isInstanceOf(Chunk.class);
++    assertThat(actual).isInstanceOf(ObjectChunk.class);
+     assertThat(actual).isEqualTo(expected);
+   }
+ 
+   @Test
+   public void addressToObjectShouldReturnCachedDeserializableFromChunkIfAskedToDecompress() {
+     byte[] data = EntryEventImpl.serialize(Long.MAX_VALUE);
+     boolean isSerialized = true;
+     boolean isCompressed = true;
+ 
 -    GemFireChunk chunk = (GemFireChunk) ma.allocateAndInitialize(data, isSerialized, isCompressed, GemFireChunk.TYPE);
++    ObjectChunk chunk = (ObjectChunk) ma.allocateAndInitialize(data, isSerialized, isCompressed);
+ 
+     // create the mock context
+     RegionEntryContext regionContext = mock(RegionEntryContext.class);
+     CachePerfStats cacheStats = mock(CachePerfStats.class);
+     Compressor compressor = mock(Compressor.class);
+ 
+     long startTime = 10000L;
+ 
+     // mock required things
+     when(regionContext.getCompressor()).thenReturn(compressor);
+     when(compressor.decompress(data)).thenReturn(data);
+     when(regionContext.getCachePerfStats()).thenReturn(cacheStats);
+     when(cacheStats.startDecompression()).thenReturn(startTime);
+ 
+     Object actual = OffHeapRegionEntryHelper.addressToObject(chunk.getMemoryAddress(), true, regionContext);
+ 
+     assertThat(actual).isInstanceOf(VMCachedDeserializable.class);
+ 
+     Long actualValue = (Long) ((VMCachedDeserializable) actual).getDeserializedForReading();
+     assertThat(actualValue).isEqualTo(Long.MAX_VALUE);
+   }
+ 
+   @Test
+   public void addressToObjectShouldReturnDecompressedValueFromChunkIfAskedToDecompress() {
+     byte[] data = ByteBuffer.allocate(Long.SIZE / Byte.SIZE).putLong(Long.MAX_VALUE).array();
+     boolean isSerialized = false;
+     boolean isCompressed = true;
+ 
 -    GemFireChunk chunk = (GemFireChunk) ma.allocateAndInitialize(data, isSerialized, isCompressed, GemFireChunk.TYPE);
++    ObjectChunk chunk = (ObjectChunk) ma.allocateAndInitialize(data, isSerialized, isCompressed);
+ 
+     // create the mock context
+     RegionEntryContext regionContext = mock(RegionEntryContext.class);
+     CachePerfStats cacheStats = mock(CachePerfStats.class);
+     Compressor compressor = mock(Compressor.class);
+ 
+     long startTime = 10000L;
+ 
+     // mock required things
+     when(regionContext.getCompressor()).thenReturn(compressor);
+     when(compressor.decompress(data)).thenReturn(data);
+     when(regionContext.getCachePerfStats()).thenReturn(cacheStats);
+     when(cacheStats.startDecompression()).thenReturn(startTime);
+ 
+     Object actual = OffHeapRegionEntryHelper.addressToObject(chunk.getMemoryAddress(), true, regionContext);
+ 
+     assertThat(actual).isInstanceOf(byte[].class);
+     assertThat(actual).isEqualTo(data);
+   }
+ 
+   @Test
+   public void addressToObjectShouldReturnValueFromDataAsAddress() {
+     byte[] data = ByteBuffer.allocate(Integer.SIZE / Byte.SIZE).putInt(Integer.MAX_VALUE).array();
+     long address = OffHeapRegionEntryHelper.encodeDataAsAddress(data, false, false);
+ 
+     DataAsAddress expected = new DataAsAddress(address);
+     Object actual = OffHeapRegionEntryHelper.addressToObject(address, false, null);
+ 
+     assertThat(actual).isInstanceOf(DataAsAddress.class);
+     assertThat(actual).isEqualTo(expected);
+   }
+ 
+   @Test
+   public void addressToObjectShouldReturnCachedDeserializableFromSerializedDataAsAddressIfAskedToDecompress() {
+     byte[] data = EntryEventImpl.serialize(Integer.MAX_VALUE);
+     boolean isSerialized = true;
+     boolean isCompressed = true;
+ 
+     long address = OffHeapRegionEntryHelper.encodeDataAsAddress(data, isSerialized, isCompressed);
+ 
+     // create the mock context
+     RegionEntryContext regionContext = mock(RegionEntryContext.class);
+     CachePerfStats cacheStats = mock(CachePerfStats.class);
+     Compressor compressor = mock(Compressor.class);
+ 
+     long startTime = 10000L;
+ 
+     // mock required things
+     when(regionContext.getCompressor()).thenReturn(compressor);
+     when(compressor.decompress(data)).thenReturn(data);
+     when(regionContext.getCachePerfStats()).thenReturn(cacheStats);
+     when(cacheStats.startDecompression()).thenReturn(startTime);
+ 
+     Object actual = OffHeapRegionEntryHelper.addressToObject(address, true, regionContext);
+ 
+     assertThat(actual).isInstanceOf(VMCachedDeserializable.class);
+ 
+     Integer actualValue = (Integer) ((VMCachedDeserializable) actual).getDeserializedForReading();
+     assertThat(actualValue).isEqualTo(Integer.MAX_VALUE);
+   }
+ 
+   @Test
+   public void addressToObjectShouldReturnDecompressedValueFromDataAsAddressIfAskedToDecompress() {
+     byte[] data = ByteBuffer.allocate(Integer.SIZE / Byte.SIZE).putInt(Integer.MAX_VALUE).array();
+     boolean isSerialized = false;
+     boolean isCompressed = true;
+ 
+     long address = OffHeapRegionEntryHelper.encodeDataAsAddress(data, isSerialized, isCompressed);
+ 
+     // create the mock context
+     RegionEntryContext regionContext = mock(RegionEntryContext.class);
+     CachePerfStats cacheStats = mock(CachePerfStats.class);
+     Compressor compressor = mock(Compressor.class);
+ 
+     long startTime = 10000L;
+ 
+     // mock required things
+     when(regionContext.getCompressor()).thenReturn(compressor);
+     when(compressor.decompress(data)).thenReturn(data);
+     when(regionContext.getCachePerfStats()).thenReturn(cacheStats);
+     when(cacheStats.startDecompression()).thenReturn(startTime);
+ 
+     Object actual = OffHeapRegionEntryHelper.addressToObject(address, true, regionContext);
+ 
+     assertThat(actual).isInstanceOf(byte[].class);
+     assertThat(actual).isEqualTo(data);
+   }
+ 
+   @Test
+   public void addressToObjectShouldReturnToken() {
+     Token token = (Token) OffHeapRegionEntryHelper.addressToObject(OffHeapRegionEntryHelper.NULL_ADDRESS, false, null);
+     assertThat(token).isNull();
+ 
+     token = (Token) OffHeapRegionEntryHelper.addressToObject(OffHeapRegionEntryHelper.INVALID_ADDRESS, false, null);
+     assertThat(token).isEqualTo(Token.INVALID);
+ 
+     token = (Token) OffHeapRegionEntryHelper.addressToObject(OffHeapRegionEntryHelper.LOCAL_INVALID_ADDRESS, false, null);
+     assertThat(token).isEqualTo(Token.LOCAL_INVALID);
+ 
+     token = (Token) OffHeapRegionEntryHelper.addressToObject(OffHeapRegionEntryHelper.DESTROYED_ADDRESS, false, null);
+     assertThat(token).isEqualTo(Token.DESTROYED);
+ 
+     token = (Token) OffHeapRegionEntryHelper.addressToObject(OffHeapRegionEntryHelper.REMOVED_PHASE1_ADDRESS, false, null);
+     assertThat(token).isEqualTo(Token.REMOVED_PHASE1);
+ 
+     token = (Token) OffHeapRegionEntryHelper.addressToObject(OffHeapRegionEntryHelper.REMOVED_PHASE2_ADDRESS, false, null);
+     assertThat(token).isEqualTo(Token.REMOVED_PHASE2);
+ 
+     token = (Token) OffHeapRegionEntryHelper.addressToObject(OffHeapRegionEntryHelper.END_OF_STREAM_ADDRESS, false, null);
+     assertThat(token).isEqualTo(Token.END_OF_STREAM);
+ 
+     token = (Token) OffHeapRegionEntryHelper.addressToObject(OffHeapRegionEntryHelper.NOT_AVAILABLE_ADDRESS, false, null);
+     assertThat(token).isEqualTo(Token.NOT_AVAILABLE);
+ 
+     token = (Token) OffHeapRegionEntryHelper.addressToObject(OffHeapRegionEntryHelper.TOMBSTONE_ADDRESS, false, null);
+     assertThat(token).isEqualTo(Token.TOMBSTONE);
+   }
+ 
+   @Test
+   public void getSerializedLengthFromDataAsAddressShouldReturnValidLength() {
+     byte[] data = ByteBuffer.allocate(Integer.SIZE / Byte.SIZE).putInt(Integer.MAX_VALUE).array();
+     boolean isSerialized = false;
+     boolean isCompressed = true;
+ 
+     long address = OffHeapRegionEntryHelper.encodeDataAsAddress(data, isSerialized, isCompressed);
+     DataAsAddress daa = new DataAsAddress(address);
+ 
+     int actualLength = OffHeapRegionEntryHelper.getSerializedLengthFromDataAsAddress(daa);
+ 
+     assertThat(actualLength).isEqualTo(data.length);
+   }
+ 
+   @Test
+   public void getSerializedLengthFromDataAsAddressShouldReturnZeroForNonEncodedAddress() {
+     DataAsAddress nonEncodedAddress = new DataAsAddress(100000L);
+     int actualLength = OffHeapRegionEntryHelper.getSerializedLengthFromDataAsAddress(nonEncodedAddress);
+     assertThat(actualLength).isZero();
+   }
+ 
+   @Test
+   public void releaseEntryShouldSetValueToRemovePhase2() {
+     // mock region entry
+     OffHeapRegionEntry re = mock(OffHeapRegionEntry.class);
+     when(re.getAddress()).thenReturn(1L);
+     when(re.setAddress(1L, OffHeapRegionEntryHelper.REMOVED_PHASE2_ADDRESS)).thenReturn(Boolean.TRUE);
+ 
+     // mock required methods
+     PowerMockito.spy(OffHeapRegionEntryHelper.class);
+     PowerMockito.doNothing().when(OffHeapRegionEntryHelper.class);
+     OffHeapRegionEntryHelper.setValue(re, Token.REMOVED_PHASE2);
+ 
+     OffHeapRegionEntryHelper.releaseEntry(re);
+ 
+     PowerMockito.verifyStatic();
+     OffHeapRegionEntryHelper.setValue(re, Token.REMOVED_PHASE2);
+   }
+ 
+   @Test
+   public void releaseEntryShouldSetValueToRemovePhase2AndSetsAsyncToFalseForDiskEntry() {
+     // mock region entry
+     OffHeapRegionEntry re = mock(VersionedStatsDiskRegionEntryOffHeap.class);
+     when(re.getAddress()).thenReturn(1L);
+     when(re.setAddress(1L, OffHeapRegionEntryHelper.REMOVED_PHASE2_ADDRESS)).thenReturn(Boolean.TRUE);
+ 
+     DiskId spy = Mockito.spy(DiskId.class);
+     when(((DiskEntry) re).getDiskId()).thenReturn(spy);
+     when(spy.isPendingAsync()).thenReturn(Boolean.TRUE);
+ 
+     // mock required methods
+     PowerMockito.spy(OffHeapRegionEntryHelper.class);
+     PowerMockito.doNothing().when(OffHeapRegionEntryHelper.class);
+     OffHeapRegionEntryHelper.setValue(re, Token.REMOVED_PHASE2);
+ 
+     OffHeapRegionEntryHelper.releaseEntry(re);
+ 
+     verify(spy, times(1)).setPendingAsync(Boolean.FALSE);
+ 
+     PowerMockito.verifyStatic();
+     OffHeapRegionEntryHelper.setValue(re, Token.REMOVED_PHASE2);
+   }
+   
+   @Test
+   public void doWithOffHeapClearShouldSetTheThreadLocalToTrue() {
+     // verify that threadlocal is not set
+     assertThat(OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap()).isFalse();
+ 
+     OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() {
+       @Override
+       public void run() {
+         // verify that threadlocal is set when offheap is cleared
+         assertThat(OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap()).isTrue();
+       }
+     });
+ 
+     // verify that threadlocal is reset after offheap is cleared
+     assertThat(OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap()).isFalse();
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapStorageJUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapStorageJUnitTest.java
index 0000000,f0f0461..d5db4e4
mode 000000,100755..100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapStorageJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapStorageJUnitTest.java
@@@ -1,0 -1,285 +1,285 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertNotNull;
+ import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
+ import static org.mockito.Mockito.*;
+ 
+ import org.junit.After;
+ import org.junit.Before;
+ import org.junit.Rule;
+ import org.junit.Test;
+ import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+ import org.junit.experimental.categories.Category;
+ 
+ import com.gemstone.gemfire.OutOfOffHeapMemoryException;
+ import com.gemstone.gemfire.StatisticsFactory;
+ import com.gemstone.gemfire.distributed.DistributedSystem;
+ import com.gemstone.gemfire.distributed.internal.DistributionStats;
+ import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+ import com.gemstone.gemfire.distributed.internal.InternalLocator;
+ import com.gemstone.gemfire.internal.LocalStatisticsFactory;
+ import com.gemstone.gemfire.test.junit.categories.UnitTest;
+ 
+ @Category(UnitTest.class)
+ public class OffHeapStorageJUnitTest {
+   @Rule
+   public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+ 
+   private final static long MEGABYTE = 1024 * 1024;
+   private final static long GIGABYTE = 1024 * 1024 * 1024;
+ 
+   @Before
+   public void setUp() throws Exception {
+   }
+ 
+   @After
+   public void tearDown() throws Exception {
+   }
+ 
+   @Test
+   public void testParseOffHeapMemorySizeNegative() {
+     assertEquals(0, OffHeapStorage.parseOffHeapMemorySize("-1"));
+   }
+   @Test
+   public void testParseOffHeapMemorySizeNull() {
+     assertEquals(0, OffHeapStorage.parseOffHeapMemorySize(null));
+   }
+   @Test
+   public void testParseOffHeapMemorySizeEmpty() {
+     assertEquals(0, OffHeapStorage.parseOffHeapMemorySize(""));
+   }
+   @Test
+   public void testParseOffHeapMemorySizeBytes() {
+     assertEquals(MEGABYTE, OffHeapStorage.parseOffHeapMemorySize("1"));
+     assertEquals(Integer.MAX_VALUE * MEGABYTE, OffHeapStorage.parseOffHeapMemorySize("" + Integer.MAX_VALUE));
+   }
+   @Test
+   public void testParseOffHeapMemorySizeKiloBytes() {
+     try {
+       OffHeapStorage.parseOffHeapMemorySize("1k");
+       fail("Did not receive expected IllegalArgumentException");
+     } catch (IllegalArgumentException expected) {
+       // Expected
+     }
+   }
+   @Test
+   public void testParseOffHeapMemorySizeMegaBytes() {
+     assertEquals(MEGABYTE, OffHeapStorage.parseOffHeapMemorySize("1m"));
+     assertEquals(Integer.MAX_VALUE * MEGABYTE, OffHeapStorage.parseOffHeapMemorySize("" + Integer.MAX_VALUE + "m"));
+   }
+   @Test
+   public void testParseOffHeapMemorySizeGigaBytes() {
+     assertEquals(GIGABYTE, OffHeapStorage.parseOffHeapMemorySize("1g"));
+     assertEquals(Integer.MAX_VALUE * GIGABYTE, OffHeapStorage.parseOffHeapMemorySize("" + Integer.MAX_VALUE + "g"));
+   }
+   @Test
+   public void testCalcMaxSlabSize() {
+     assertEquals(100, OffHeapStorage.calcMaxSlabSize(100L));
+     assertEquals(Integer.MAX_VALUE, OffHeapStorage.calcMaxSlabSize(Long.MAX_VALUE));
+     try {
+       System.setProperty("gemfire.OFF_HEAP_SLAB_SIZE", "99");
+       assertEquals(99*1024*1024, OffHeapStorage.calcMaxSlabSize(100L*1024*1024));
+       assertEquals(88, OffHeapStorage.calcMaxSlabSize(88));
+       System.setProperty("gemfire.OFF_HEAP_SLAB_SIZE", "88m");
+       assertEquals(88*1024*1024, OffHeapStorage.calcMaxSlabSize(100L*1024*1024));
+       System.setProperty("gemfire.OFF_HEAP_SLAB_SIZE", "77M");
+       assertEquals(77*1024*1024, OffHeapStorage.calcMaxSlabSize(100L*1024*1024));
+       System.setProperty("gemfire.OFF_HEAP_SLAB_SIZE", "1g");
+       assertEquals(1*1024*1024*1024, OffHeapStorage.calcMaxSlabSize(2L*1024*1024*1024));
+       System.setProperty("gemfire.OFF_HEAP_SLAB_SIZE", "1G");
+       assertEquals(1L*1024*1024*1024, OffHeapStorage.calcMaxSlabSize(2L*1024*1024*1024+1));
+       System.setProperty("gemfire.OFF_HEAP_SLAB_SIZE", "foobarG");
+       try {
+         OffHeapStorage.calcMaxSlabSize(100);
+         fail("expected IllegalArgumentException");
+       } catch (IllegalArgumentException expected) {
+       }
+       System.setProperty("gemfire.OFF_HEAP_SLAB_SIZE", "");
+       assertEquals(100, OffHeapStorage.calcMaxSlabSize(100L));
+       assertEquals(Integer.MAX_VALUE, OffHeapStorage.calcMaxSlabSize(Long.MAX_VALUE));
+     } finally {
+       System.clearProperty("gemfire.OFF_HEAP_SLAB_SIZE");
+     }
+   }
+   @Test
+   public void createOffHeapStorageReturnsNullIfForceLocator() {
+     System.setProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE, "true");
+     assertEquals(null, OffHeapStorage.createOffHeapStorage(null, null, 1, null));
+   }
+   @Test
+   public void createOffHeapStorageReturnsNullIfMemorySizeIsZero() {
+     assertEquals(null, OffHeapStorage.createOffHeapStorage(null, null, 0, null));
+   }
+   @Test
+   public void exceptionIfSlabCountTooSmall() {
+     StatisticsFactory statsFactory = mock(StatisticsFactory.class);
+     try {
+       OffHeapStorage.createOffHeapStorage(null, statsFactory, OffHeapStorage.MIN_SLAB_SIZE-1, null);
+     } catch (IllegalArgumentException expected) {
+       expected.getMessage().equals("The amount of off heap memory must be at least " + OffHeapStorage.MIN_SLAB_SIZE + " but it was set to " + (OffHeapStorage.MIN_SLAB_SIZE-1));
+     }
+   }
+   @Test
+   public void exceptionIfDistributedSystemNull() {
+     StatisticsFactory statsFactory = mock(StatisticsFactory.class);
+     try {
+       OffHeapStorage.createOffHeapStorage(null, statsFactory, OffHeapStorage.MIN_SLAB_SIZE, (DistributedSystem)null);
+     } catch (IllegalArgumentException expected) {
+       expected.getMessage().equals("InternalDistributedSystem is null");
+     }
+   }
+   
+   @Test
+   public void createOffHeapStorageWorks() {
+     StatisticsFactory localStatsFactory = new LocalStatisticsFactory(null);
+     InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+     MemoryAllocator ma = OffHeapStorage.createOffHeapStorage(null, localStatsFactory, OffHeapStorage.MIN_SLAB_SIZE, ids);
+     System.setProperty(SimpleMemoryAllocatorImpl.FREE_OFF_HEAP_MEMORY_PROPERTY, "true");
+     ma.close();
+   }
+ 
+   @Test
+   public void testCreateOffHeapStorage() {
+     StatisticsFactory localStatsFactory = new LocalStatisticsFactory(null);
+     OutOfOffHeapMemoryListener ooohml = mock(OutOfOffHeapMemoryListener.class);
+     MemoryAllocator ma = OffHeapStorage.basicCreateOffHeapStorage(null, localStatsFactory, 1024*1024, ooohml);
+     try {
+       OffHeapMemoryStats stats = ma.getStats();
+       assertNotNull(stats.getStats());
+       assertEquals(1024*1024, stats.getFreeMemory());
+       assertEquals(1024*1024, stats.getMaxMemory());
+       assertEquals(0, stats.getUsedMemory());
+       assertEquals(0, stats.getCompactions());
+       assertEquals(0, stats.getCompactionTime());
+       assertEquals(0, stats.getFragmentation());
+       assertEquals(1, stats.getFragments());
+       assertEquals(1024*1024, stats.getLargestFragment());
+       assertEquals(0, stats.getObjects());
+       assertEquals(0, stats.getReads());
+ 
+       stats.incFreeMemory(100);
+       assertEquals(1024*1024+100, stats.getFreeMemory());
+       stats.incFreeMemory(-100);
+       assertEquals(1024*1024, stats.getFreeMemory());
+ 
+       stats.incMaxMemory(100);
+       assertEquals(1024*1024+100, stats.getMaxMemory());
+       stats.incMaxMemory(-100);
+       assertEquals(1024*1024, stats.getMaxMemory());
+ 
+       stats.incUsedMemory(100);
+       assertEquals(100, stats.getUsedMemory());
+       stats.incUsedMemory(-100);
+       assertEquals(0, stats.getUsedMemory());
+ 
+       stats.incObjects(100);
+       assertEquals(100, stats.getObjects());
+       stats.incObjects(-100);
+       assertEquals(0, stats.getObjects());
+ 
+       stats.incReads();
+       assertEquals(1, stats.getReads());
+ 
+       stats.setFragmentation(100);
+       assertEquals(100, stats.getFragmentation());
+       stats.setFragmentation(0);
+       assertEquals(0, stats.getFragmentation());
+ 
+       stats.setFragments(2);
+       assertEquals(2, stats.getFragments());
+       stats.setFragments(1);
+       assertEquals(1, stats.getFragments());
+ 
+       stats.setLargestFragment(100);
+       assertEquals(100, stats.getLargestFragment());
+       stats.setLargestFragment(1024*1024);
+       assertEquals(1024*1024, stats.getLargestFragment());
+ 
+       boolean originalEnableClockStats = DistributionStats.enableClockStats;
+       DistributionStats.enableClockStats = true;
+       try {
+         long start = stats.startCompaction();
+         while (stats.startCompaction() == start) {
+           Thread.yield();
+         }
+         stats.endCompaction(start);
+         assertEquals(1, stats.getCompactions());
+         assertTrue(stats.getCompactionTime() > 0);
+       } finally {
+         DistributionStats.enableClockStats = originalEnableClockStats;
+       }
+ 
+       stats.incObjects(100);
+       stats.incUsedMemory(100);
+       stats.setFragmentation(100);
+       OffHeapStorage ohs = (OffHeapStorage) stats;
+       ohs.initialize(new NullOffHeapMemoryStats());
+       assertEquals(0, stats.getFreeMemory());
+       assertEquals(0, stats.getMaxMemory());
+       assertEquals(0, stats.getUsedMemory());
+       assertEquals(0, stats.getCompactions());
+       assertEquals(0, stats.getCompactionTime());
+       assertEquals(0, stats.getFragmentation());
+       assertEquals(0, stats.getFragments());
+       assertEquals(0, stats.getLargestFragment());
+       assertEquals(0, stats.getObjects());
+       assertEquals(0, stats.getReads());
+ 
+       OutOfOffHeapMemoryException ex = null;
+       try {
 -        ma.allocate(1024*1024+1, null);
++        ma.allocate(1024*1024+1);
+         fail("expected OutOfOffHeapMemoryException");
+       } catch (OutOfOffHeapMemoryException expected) {
+         ex = expected;
+       }
+       verify(ooohml).outOfOffHeapMemory(ex);
+       try {
 -        ma.allocate(1024*1024+1, null);
++        ma.allocate(1024*1024+1);
+         fail("expected OutOfOffHeapMemoryException");
+       } catch (OutOfOffHeapMemoryException expected) {
+         ex = expected;
+       }
+       verify(ooohml).outOfOffHeapMemory(ex);
+ 
+     } finally {
+       System.setProperty(SimpleMemoryAllocatorImpl.FREE_OFF_HEAP_MEMORY_PROPERTY, "true");
+       try {
+         ma.close();
+       } finally {
+         System.clearProperty(SimpleMemoryAllocatorImpl.FREE_OFF_HEAP_MEMORY_PROPERTY);
+       }
+     }
+   }
+   @Test
+   public void testCalcSlabCount() {
+     final long MSS = OffHeapStorage.MIN_SLAB_SIZE;
+     assertEquals(100, OffHeapStorage.calcSlabCount(MSS*4, MSS*4*100));
+     assertEquals(100, OffHeapStorage.calcSlabCount(MSS*4, (MSS*4*100) + (MSS-1)));
+     assertEquals(101, OffHeapStorage.calcSlabCount(MSS*4, (MSS*4*100) + MSS));
+     assertEquals(Integer.MAX_VALUE, OffHeapStorage.calcSlabCount(MSS, MSS * Integer.MAX_VALUE));
+     assertEquals(Integer.MAX_VALUE, OffHeapStorage.calcSlabCount(MSS, (MSS * Integer.MAX_VALUE) + MSS-1));
+     try {
+       OffHeapStorage.calcSlabCount(MSS, (((long)MSS) * Integer.MAX_VALUE) + MSS);
+       fail("Expected IllegalArgumentException");
+     } catch (IllegalArgumentException expected) {
+     }
+   }
+ }