You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/22 19:36:22 UTC

[60/83] [abbrv] incubator-geode git commit: GEODE-917: Merge branch 'feature/GEODE-917' into develop

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OldValueImporterTestBase.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OldValueImporterTestBase.java
index 0000000,f7d0714..690b55a
mode 000000,100644..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OldValueImporterTestBase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OldValueImporterTestBase.java
@@@ -1,0 -1,181 +1,181 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.cache;
+ 
+ import static org.junit.Assert.*;
+ 
+ import java.io.IOException;
+ 
+ import org.junit.After;
+ import org.junit.Before;
+ import org.junit.Test;
+ 
+ import com.gemstone.gemfire.internal.HeapDataOutputStream;
+ import com.gemstone.gemfire.internal.cache.EntryEventImpl.OldValueImporter;
 -import com.gemstone.gemfire.internal.offheap.Chunk;
++import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+ import com.gemstone.gemfire.internal.offheap.DataAsAddress;
+ import com.gemstone.gemfire.internal.offheap.NullOffHeapMemoryStats;
+ import com.gemstone.gemfire.internal.offheap.NullOutOfOffHeapMemoryListener;
+ import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
+ import com.gemstone.gemfire.internal.offheap.UnsafeMemoryChunk;
+ import com.gemstone.gemfire.internal.util.BlobHelper;
+ 
+ public abstract class OldValueImporterTestBase {
+   @Before
+   public void setUp() throws Exception {
+   }
+ 
+   @After
+   public void tearDown() throws Exception {
+   }
+   
+   protected abstract OldValueImporter createImporter();
+   protected abstract Object getOldValueFromImporter(OldValueImporter ovi);
+   protected abstract void toData(OldValueImporter ovi, HeapDataOutputStream hdos) throws IOException;
+   protected abstract void fromData(OldValueImporter ovi, byte[] bytes) throws IOException, ClassNotFoundException;
+ 
+   @Test
+   public void testValueSerialization() throws IOException, ClassNotFoundException {
+     byte[] bytes = new byte[1024];
+     HeapDataOutputStream hdos = new HeapDataOutputStream(bytes);
+     OldValueImporter imsg = createImporter();
+ 
+     // null byte array value
+     {
+       OldValueImporter omsg = createImporter();
+       omsg.importOldBytes(null, false);
+       toData(omsg, hdos);
+       fromData(imsg, bytes);
+       assertEquals(null, getOldValueFromImporter(imsg));
+     }
+     
+     // null object value
+     {
+       OldValueImporter omsg = createImporter();
+       omsg.importOldObject(null, true);
+       toData(omsg, hdos);
+       fromData(imsg, bytes);
+       assertEquals(null, getOldValueFromImporter(imsg));
+     }
+     
+     // simple byte array
+     {
+       byte[] baValue = new byte[] {1,2,3,4,5,6,7,8,9};
+       OldValueImporter omsg = createImporter();
+       omsg.importOldBytes(baValue, false);
+       hdos = new HeapDataOutputStream(bytes);
+       toData(omsg, hdos);
+       fromData(imsg, bytes);
+       assertArrayEquals(baValue, (byte[])getOldValueFromImporter(imsg));
+     }
+     
+     // String in serialized form
+     {
+       String stringValue = "1,2,3,4,5,6,7,8,9";
+       byte[] stringValueBlob = EntryEventImpl.serialize(stringValue);
+       OldValueImporter omsg = createImporter();
+       omsg.importOldBytes(stringValueBlob, true);
+       hdos = new HeapDataOutputStream(bytes);
+       toData(omsg, hdos);
+       fromData(imsg, bytes);
+       assertArrayEquals(stringValueBlob, ((VMCachedDeserializable)getOldValueFromImporter(imsg)).getSerializedValue());
+     }
+     
+     // String in object form
+     {
+       String stringValue = "1,2,3,4,5,6,7,8,9";
+       byte[] stringValueBlob = EntryEventImpl.serialize(stringValue);
+       OldValueImporter omsg = createImporter();
+       omsg.importOldObject(stringValue, true);
+       hdos = new HeapDataOutputStream(bytes);
+       toData(omsg, hdos);
+       fromData(imsg, bytes);
+       assertArrayEquals(stringValueBlob, ((VMCachedDeserializable)getOldValueFromImporter(imsg)).getSerializedValue());
+     }
+     
+     // off-heap DataAsAddress byte array
+     {
+       SimpleMemoryAllocatorImpl sma =
 -          SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
++          SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
+       try {
+         byte[] baValue = new byte[] {1,2};
 -        DataAsAddress baValueSO = (DataAsAddress) sma.allocateAndInitialize(baValue, false, false, null);
++        DataAsAddress baValueSO = (DataAsAddress) sma.allocateAndInitialize(baValue, false, false);
+         OldValueImporter omsg = createImporter();
+         omsg.importOldObject(baValueSO, false);
+         hdos = new HeapDataOutputStream(bytes);
+         toData(omsg, hdos);
+         fromData(imsg, bytes);
+         assertArrayEquals(baValue, (byte[])getOldValueFromImporter(imsg));
+       } finally {
+         SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+       }
+     }
+     // off-heap Chunk byte array
+     {
+       SimpleMemoryAllocatorImpl sma =
 -          SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
++          SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
+       try {
+         byte[] baValue = new byte[] {1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17};
 -        Chunk baValueSO = (Chunk) sma.allocateAndInitialize(baValue, false, false, null);
++        ObjectChunk baValueSO = (ObjectChunk) sma.allocateAndInitialize(baValue, false, false);
+         OldValueImporter omsg = createImporter();
+         omsg.importOldObject(baValueSO, false);
+         hdos = new HeapDataOutputStream(bytes);
+         toData(omsg, hdos);
+         fromData(imsg, bytes);
+         assertArrayEquals(baValue, (byte[])getOldValueFromImporter(imsg));
+       } finally {
+         SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+       }
+     }
+     // off-heap DataAsAddress String
+     {
+       SimpleMemoryAllocatorImpl sma =
 -          SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
++          SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
+       try {
+         String baValue = "12";
+         byte[] baValueBlob = BlobHelper.serializeToBlob(baValue);
 -        DataAsAddress baValueSO = (DataAsAddress) sma.allocateAndInitialize(baValueBlob, true, false, null);
++        DataAsAddress baValueSO = (DataAsAddress) sma.allocateAndInitialize(baValueBlob, true, false);
+         OldValueImporter omsg = createImporter();
+         omsg.importOldObject(baValueSO, true);
+         hdos = new HeapDataOutputStream(bytes);
+         toData(omsg, hdos);
+         fromData(imsg, bytes);
+         assertArrayEquals(baValueBlob, ((VMCachedDeserializable)getOldValueFromImporter(imsg)).getSerializedValue());
+       } finally {
+         SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+       }
+     }
+     // off-heap Chunk String
+     {
+       SimpleMemoryAllocatorImpl sma =
 -          SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
++          SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
+       try {
+         String baValue = "12345678";
+         byte[] baValueBlob = BlobHelper.serializeToBlob(baValue);
 -        Chunk baValueSO = (Chunk) sma.allocateAndInitialize(baValueBlob, true, false, null);
++        ObjectChunk baValueSO = (ObjectChunk) sma.allocateAndInitialize(baValueBlob, true, false);
+         OldValueImporter omsg = createImporter();
+         omsg.importOldObject(baValueSO, true);
+         hdos = new HeapDataOutputStream(bytes);
+         toData(omsg, hdos);
+         fromData(imsg, bytes);
+         assertArrayEquals(baValueBlob, ((VMCachedDeserializable)getOldValueFromImporter(imsg)).getSerializedValue());
+       } finally {
+         SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+       }
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java
index 0000000,3dc5a7d..b7bd47a
mode 000000,100755..100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java
@@@ -1,0 -1,75 +1,112 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.cache.tier.sockets;
+ 
+ import static org.junit.Assert.*;
+ import static org.mockito.Matchers.*;
+ import static org.mockito.Mockito.*;
+ 
+ import java.net.Socket;
+ import java.nio.ByteBuffer;
+ 
+ import org.junit.Before;
+ import org.junit.Test;
+ import org.junit.experimental.categories.Category;
+ 
+ import com.gemstone.gemfire.internal.Version;
+ import com.gemstone.gemfire.internal.offheap.HeapByteBufferMemoryChunkJUnitTest;
+ import com.gemstone.gemfire.test.junit.categories.UnitTest;
+ 
+ @Category(UnitTest.class)
+ public class MessageJUnitTest {
+ 
+   Message message;
+   Socket mockSocket;
+   MessageStats mockStats;
+   ByteBuffer msgBuffer;
++  ServerConnection mockServerConnection;
+   
+   @Before
+   public void setUp() throws Exception {
+     mockSocket = mock(Socket.class);
 -    message = new Message(5, Version.CURRENT);
 -    assertEquals(5, message.getNumberOfParts());
++    message = new Message(2, Version.CURRENT);
++    assertEquals(2, message.getNumberOfParts());
+     mockStats = mock(MessageStats.class);
+     msgBuffer = ByteBuffer.allocate(1000);
 -    message.setComms(mockSocket, msgBuffer, mockStats);
++    mockServerConnection = mock(ServerConnection.class);
++    message.setComms(mockServerConnection, mockSocket, msgBuffer, mockStats);
+   }
+ 
+   @Test
+   public void clearDoesNotThrowNPE() throws Exception{
+     // unsetComms clears the message's ByteBuffer, which was causing an NPE during shutdown
+     // when clear() was invoked
+     message.unsetComms();
+     message.clear();
+   }
+   
+   @Test
+   public void numberOfPartsIsAdjusted() {
+     int numParts = message.getNumberOfParts();
 -    message.setNumberOfParts(2*numParts);
 -    assertEquals(2*numParts, message.getNumberOfParts());
++    message.setNumberOfParts(2*numParts+1);
++    assertEquals(2*numParts+1, message.getNumberOfParts());
+     message.addBytesPart(new byte[1]);
+     message.addIntPart(2);
+     message.addLongPart(3);
+     message.addObjPart("4");
+     message.addStringPart("5");
+     assertEquals(5, message.getNextPartNumber());
+   }
+   
++  @Test
++  public void messageLongerThanMaxIntIsRejected() throws Exception {
++    Part[] parts = new Part[2];
++    Part mockPart1 = mock(Part.class);
++    when(mockPart1.getLength()).thenReturn(Integer.MAX_VALUE/2);
++    parts[0] = mockPart1;
++    parts[1] = mockPart1;
++    message.setParts(parts);
++    try {
++      message.send();
++    } catch (MessageTooLargeException e) {
++      assertTrue(e.getMessage().contains("exceeds maximum integer value"));
++      return;
++    }
++    fail("expected an exception but none was thrown");
++  }
++  
++  @Test
++  public void maxMessageSizeIsRespected() throws Exception {
++    Part[] parts = new Part[2];
++    Part mockPart1 = mock(Part.class);
++    when(mockPart1.getLength()).thenReturn(Message.MAX_MESSAGE_SIZE/2);
++    parts[0] = mockPart1;
++    parts[1] = mockPart1;
++    message.setParts(parts);
++    try {
++      message.send();
++    } catch (MessageTooLargeException e) {
++      assertFalse(e.getMessage().contains("exceeds maximum integer value"));
++      return;
++    }
++    fail("expected an exception but none was thrown");
++  }
++  
++  
+   // TODO many more tests are needed
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/FragmentJUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/FragmentJUnitTest.java
index 0000000,ccc6b67..54eac9e
mode 000000,100644..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/FragmentJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/FragmentJUnitTest.java
@@@ -1,0 -1,280 +1,238 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import static org.junit.Assert.*;
 -import static org.mockito.Mockito.mock;
+ import static org.hamcrest.CoreMatchers.*;
+ 
+ import java.util.Arrays;
+ 
+ import org.assertj.core.api.JUnitSoftAssertions;
+ import org.junit.After;
+ import org.junit.AfterClass;
+ import org.junit.Before;
+ import org.junit.BeforeClass;
+ import org.junit.Rule;
+ import org.junit.Test;
+ import org.junit.experimental.categories.Category;
+ import org.junit.rules.ExpectedException;
+ 
 -import com.gemstone.gemfire.LogWriter;
+ import com.gemstone.gemfire.test.junit.categories.UnitTest;
+ 
+ @Category(UnitTest.class)
+ public class FragmentJUnitTest {
+ 
 -  private SimpleMemoryAllocatorImpl ma;
 -  private OutOfOffHeapMemoryListener ooohml;
 -  private OffHeapMemoryStats stats;
 -  private LogWriter lw;
 -  private UnsafeMemoryChunk.Factory umcFactory;
+   private UnsafeMemoryChunk[] slabs;
 -  private int numSlabs;
+ 
+   static {
+     ClassLoader.getSystemClassLoader().setDefaultAssertionStatus(true);
+   }
+   
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+   
+   @Rule
+   public JUnitSoftAssertions softly = new JUnitSoftAssertions();
+ 
+ 
+   @BeforeClass
+   public static void setUpBeforeClass() throws Exception {
+   }
+ 
+   @AfterClass
+   public static void tearDownAfterClass() throws Exception {
+   }
+ 
+   @Before
+   public void setUp() throws Exception {
 -    ooohml = mock(OutOfOffHeapMemoryListener.class);
 -    stats = mock(OffHeapMemoryStats.class);
 -    lw = mock(LogWriter.class);
 -    
 -    numSlabs = 2;
 -    umcFactory = new UnsafeMemoryChunk.Factory(){
 -      @Override
 -      public UnsafeMemoryChunk create(int size) {
 -        return new UnsafeMemoryChunk(size);
 -      }
 -    };
 -    slabs = allocateMemorySlabs();
++    UnsafeMemoryChunk slab1 = new UnsafeMemoryChunk((int)OffHeapStorage.MIN_SLAB_SIZE);
++    UnsafeMemoryChunk slab2 = new UnsafeMemoryChunk((int)OffHeapStorage.MIN_SLAB_SIZE);
++    slabs = new UnsafeMemoryChunk[]{slab1, slab2};
+   }
+ 
+   @After
+   public void tearDown() throws Exception {
 -    SimpleMemoryAllocatorImpl.freeOffHeapMemory();
++    for (int i=0; i < slabs.length; i++) {
++      slabs[i].release();
++    }
+   }
+   
 -  private UnsafeMemoryChunk[] allocateMemorySlabs() {
 -    ma = SimpleMemoryAllocatorImpl.create(ooohml, stats, lw, numSlabs, OffHeapStorage.MIN_SLAB_SIZE * numSlabs, OffHeapStorage.MIN_SLAB_SIZE, umcFactory);
 -    return ma.getSlabs();
 -  }
 -
+   
+   @Test
+   public void fragmentConstructorThrowsExceptionForNon8ByteAlignedAddress() {
+       expectedException.expect(IllegalStateException.class);
+       expectedException.expectMessage("address was not 8 byte aligned");
+ 
+       new Fragment(slabs[0].getMemoryAddress() + 2, 0);
+       fail("Constructor failed to throw exception for non-8-byte alignment");
+   }
+ 
+   @Test
+   public void zeroSizeFragmentHasNoFreeSpace() {
+     Fragment fragment = new Fragment(slabs[0].getMemoryAddress(), 0);
+     assertThat(fragment.freeSpace(), is(0));
+   }
+ 
+   @Test
+   public void unallocatedFragmentHasFreeSpaceEqualToFragmentSize() {
+     Fragment fragment = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
+     softly.assertThat(fragment.getSize()).isEqualTo((int)OffHeapStorage.MIN_SLAB_SIZE);
+     softly.assertThat(fragment.freeSpace()).isEqualTo((int)OffHeapStorage.MIN_SLAB_SIZE);
+   }
+ 
+   @Test
+   public void allocatingFromFragmentReducesFreeSpace() {
+     Fragment fragment = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
+     softly.assertThat(fragment.allocate(fragment.getFreeIndex(), fragment.getFreeIndex() + 256)).isEqualTo(true);
+     softly.assertThat(fragment.freeSpace()).isEqualTo(768);
+     softly.assertThat(fragment.getFreeIndex()).isEqualTo(256);
+   }
+ 
+   @Test
+   public void fragementAllocationIsUnsafeWithRespectToAllocationSize() {
+     Fragment fragment = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
+     softly.assertThat(fragment.allocate(fragment.getFreeIndex(), fragment.getFreeIndex() + (int)OffHeapStorage.MIN_SLAB_SIZE + 8)).isEqualTo(true);
+     softly.assertThat(fragment.freeSpace()).isEqualTo(-8);
+   }
+ 
+   @Test
+   public void getBlockSizeReturnsFreeSpace() {
+     Fragment fragment = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
+     softly.assertThat(fragment.allocate(fragment.getFreeIndex(), fragment.getFreeIndex() + 256)).isEqualTo(true);
+     softly.assertThat(fragment.getBlockSize()).isEqualTo(fragment.freeSpace());
+   }
+ 
+   @Test
+   public void getMemoryAdressIsAlwaysFragmentBaseAddress() {
+     Fragment fragment = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
+     softly.assertThat(fragment.getMemoryAddress()).isEqualTo(slabs[0].getMemoryAddress());
+     fragment.allocate(fragment.getFreeIndex(), fragment.getFreeIndex() + 256);
+     softly.assertThat(fragment.getMemoryAddress()).isEqualTo(slabs[0].getMemoryAddress());
+   }
+   
+   @Test
+   public void getStateIsAlwaysStateUNUSED() {
 -    slabs = allocateMemorySlabs();
+     Fragment fragment = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
+     softly.assertThat(fragment.getState()).isEqualTo(MemoryBlock.State.UNUSED);
+     fragment.allocate(fragment.getFreeIndex(), fragment.getFreeIndex() + 256);
+     softly.assertThat(fragment.getState()).isEqualTo(MemoryBlock.State.UNUSED);
+   }
+ 
+   @Test
+   public void getFreeListIdIsAlwaysMinus1() {
 -    slabs = allocateMemorySlabs();
+     Fragment fragment = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
+     softly.assertThat(fragment.getFreeListId()).isEqualTo(-1);
+     fragment.allocate(fragment.getFreeIndex(), fragment.getFreeIndex() + 256);
+     softly.assertThat(fragment.getFreeListId()).isEqualTo(-1);
+   }
+ 
+   @Test
+   public void getRefCountIsAlwaysZero() {
 -    slabs = allocateMemorySlabs();
+     Fragment fragment = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
+     softly.assertThat(fragment.getRefCount()).isEqualTo(0);
+     fragment.allocate(fragment.getFreeIndex(), fragment.getFreeIndex() + 256);
+     softly.assertThat(fragment.getRefCount()).isEqualTo(0);
+   }
+ 
+   @Test
+   public void getDataTypeIsAlwaysNA() {
 -    slabs = allocateMemorySlabs();
+     Fragment fragment = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
+     softly.assertThat(fragment.getDataType()).isEqualTo("N/A");
+     fragment.allocate(fragment.getFreeIndex(), fragment.getFreeIndex() + 256);
+     softly.assertThat(fragment.getDataType()).isEqualTo("N/A");
+   }
+ 
+   @Test
+   public void isSerializedIsAlwaysFalse() {
 -    slabs = allocateMemorySlabs();
+     Fragment fragment = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
+     softly.assertThat(fragment.isSerialized()).isEqualTo(false);
+     fragment.allocate(fragment.getFreeIndex(), fragment.getFreeIndex() + 256);
+     softly.assertThat(fragment.isSerialized()).isEqualTo(false);
+   }
+ 
+   @Test
+   public void isCompressedIsAlwaysFalse() {
 -    slabs = allocateMemorySlabs();
+     Fragment fragment = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
+     softly.assertThat(fragment.isCompressed()).isEqualTo(false);
+     fragment.allocate(fragment.getFreeIndex(), fragment.getFreeIndex() + 256);
+     softly.assertThat(fragment.isCompressed()).isEqualTo(false);
+   }
+ 
+   @Test
+   public void getDataValueIsAlwaysNull() {
 -    slabs = allocateMemorySlabs();
+     Fragment fragment = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
+     softly.assertThat(fragment.getDataValue()).isNull();
+     fragment.allocate(fragment.getFreeIndex(), fragment.getFreeIndex() + 256);
+     softly.assertThat(fragment.getDataValue()).isNull();
+   }
+ 
+   @Test
 -  public void getChunkTypeIsAlwaysNull() {
 -    slabs = allocateMemorySlabs();
 -    Fragment fragment = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
 -    softly.assertThat(fragment.getChunkType()).isNull();
 -    fragment.allocate(fragment.getFreeIndex(), fragment.getFreeIndex() + 256);
 -    softly.assertThat(fragment.getChunkType()).isNull();
 -  }
 -
 -  @Test
+   public void fragmentEqualsComparesMemoryBlockAddresses() {
 -    slabs = allocateMemorySlabs();
+     Fragment fragment0 = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
+     Fragment sameFragment = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
+     Fragment fragment1 = new Fragment(slabs[1].getMemoryAddress(), slabs[1].getSize());
+     softly.assertThat(fragment0.equals(sameFragment)).isEqualTo(true);
+     softly.assertThat(fragment0.equals(fragment1)).isEqualTo(false);
+   }
+ 
+   @Test
+   public void fragmentEqualsIsFalseForNonFragmentObjects() {
 -    slabs = allocateMemorySlabs();
+     Fragment fragment0 = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
+     assertThat(fragment0.equals(slabs[0]), is(false));
+   }
+ 
+   @Test
+   public void fragmentHashCodeIsHashCodeOfItsMemoryAddress() {
 -    slabs = allocateMemorySlabs();
+     Fragment fragment0 = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
+     Fragment fragment1 = new Fragment(slabs[1].getMemoryAddress(), slabs[1].getSize());
+     Long fragmentAddress = fragment0.getMemoryAddress();
+     softly.assertThat(fragment0.hashCode()).isEqualTo(fragmentAddress.hashCode())
+                                            .isNotEqualTo(fragment1.hashCode());
+   }
+ 
+   @Test
+   public void fragmentFillSetsAllBytesToTheSameConstantValue() {
 -    slabs = allocateMemorySlabs();
+     Fragment fragment = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
+     Long fragmentAddress = fragment.getMemoryAddress();
+     byte[] bytes = new byte[(int)OffHeapStorage.MIN_SLAB_SIZE];
+     byte[] expectedBytes = new byte[(int)OffHeapStorage.MIN_SLAB_SIZE];
 -    Arrays.fill(expectedBytes, Chunk.FILL_BYTE);;
++    Arrays.fill(expectedBytes, ObjectChunk.FILL_BYTE);;
+     fragment.fill();
+     UnsafeMemoryChunk.readAbsoluteBytes(fragmentAddress, bytes, 0, (int)OffHeapStorage.MIN_SLAB_SIZE);
+     assertThat(bytes, is(equalTo(expectedBytes)));
+   }
+ 
+   @Test
+   public void getNextBlockThrowsExceptionForFragment() {
+     expectedException.expect(UnsupportedOperationException.class);
+ 
 -    slabs = allocateMemorySlabs();
+     Fragment fragment = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
+     fragment.getNextBlock();
+     fail("getNextBlock failed to throw UnsupportedOperationException");
+   }
+ 
+   @Test
+   public void getSlabIdThrowsExceptionForFragment() {
+     expectedException.expect(UnsupportedOperationException.class);
+ 
 -    slabs = allocateMemorySlabs();
+     Fragment fragment = new Fragment(slabs[0].getMemoryAddress(), slabs[0].getSize());
+     fragment.getSlabId();
+     fail("getSlabId failed to throw UnsupportedOperationException");
+   }
+   
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/FreeListOffHeapRegionJUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/FreeListOffHeapRegionJUnitTest.java
index 0000000,93f2039..6790f6a
mode 000000,100644..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/FreeListOffHeapRegionJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/FreeListOffHeapRegionJUnitTest.java
@@@ -1,0 -1,46 +1,46 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import org.junit.experimental.categories.Category;
+ 
+ import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+ 
+ @Category(IntegrationTest.class)
+ public class FreeListOffHeapRegionJUnitTest extends OffHeapRegionBase {
+ 
+   @Override
+   protected String getOffHeapMemorySize() {
+     return "20m";
+   }
+   
+   @Override
+   public void configureOffHeapStorage() {
+     System.setProperty("gemfire.OFF_HEAP_SLAB_SIZE", "1m");
+   }
+ 
+   @Override
+   public void unconfigureOffHeapStorage() {
+     System.clearProperty("gemfire.OFF_HEAP_SLAB_SIZE");
+   }
+ 
+   @Override
+   public int perObjectOverhead() {
 -    return Chunk.OFF_HEAP_HEADER_SIZE;
++    return ObjectChunk.OFF_HEAP_HEADER_SIZE;
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/LifecycleListenerJUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/LifecycleListenerJUnitTest.java
index 0000000,5e54b73..97ae486
mode 000000,100755..100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/LifecycleListenerJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/LifecycleListenerJUnitTest.java
@@@ -1,0 -1,230 +1,230 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.fail;
+ 
+ import java.io.PrintWriter;
+ import java.io.StringWriter;
+ import java.util.ArrayList;
+ import java.util.List;
+ 
+ import org.junit.After;
+ import org.junit.Assert;
+ import org.junit.Rule;
+ import org.junit.Test;
+ import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+ import org.junit.experimental.categories.Category;
+ 
+ import com.gemstone.gemfire.test.junit.categories.UnitTest;
+ 
+ /**
+  * Tests LifecycleListener
+  * 
+  * @author Kirk Lund
+  */
+ @Category(UnitTest.class)
+ public class LifecycleListenerJUnitTest {
+   @Rule
+   public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+ 
+   private final List<LifecycleListenerCallback> afterCreateCallbacks = new ArrayList<LifecycleListenerCallback>();
+   private final List<LifecycleListenerCallback> afterReuseCallbacks = new ArrayList<LifecycleListenerCallback>();
+   private final List<LifecycleListenerCallback> beforeCloseCallbacks = new ArrayList<LifecycleListenerCallback>();
+   private final TestLifecycleListener listener = new TestLifecycleListener(this.afterCreateCallbacks, this.afterReuseCallbacks, this.beforeCloseCallbacks);
+ 
+   @After
+   public void tearDown() throws Exception {
+     LifecycleListener.removeLifecycleListener(this.listener);
+     this.afterCreateCallbacks.clear();
+     this.afterReuseCallbacks.clear();
+     this.beforeCloseCallbacks.clear();
+     SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+   }
+ 
+   @Test
+   public void testAddRemoveListener() {
+     LifecycleListener.addLifecycleListener(this.listener);
+     LifecycleListener.removeLifecycleListener(this.listener);
+ 
+     UnsafeMemoryChunk slab = new UnsafeMemoryChunk(1024); // 1k
 -    SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(),
++    SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(),
+         new UnsafeMemoryChunk[] { slab });
+ 
+     Assert.assertEquals(0, this.afterCreateCallbacks.size());
+     Assert.assertEquals(0, this.afterReuseCallbacks.size());
+     Assert.assertEquals(0, this.beforeCloseCallbacks.size());
+ 
+     ma.close();
+ 
+     Assert.assertEquals(0, this.afterCreateCallbacks.size());
+     Assert.assertEquals(0, this.afterReuseCallbacks.size());
+     Assert.assertEquals(0, this.beforeCloseCallbacks.size());
+ 
+     LifecycleListener.removeLifecycleListener(this.listener);
+   }
+ 
+   @Test
+   public void testCallbacksAreCalledAfterCreate() {
+     LifecycleListener.addLifecycleListener(this.listener);
+     UnsafeMemoryChunk slab = new UnsafeMemoryChunk(1024); // 1k
 -    SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(),
++    SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(),
+         new UnsafeMemoryChunk[] { slab });
+ 
+     Assert.assertEquals(1, this.afterCreateCallbacks.size());
+     Assert.assertEquals(0, this.afterReuseCallbacks.size());
+     Assert.assertEquals(0, this.beforeCloseCallbacks.size());
+ 
+     closeAndFree(ma);
+ 
+     Assert.assertEquals(1, this.afterCreateCallbacks.size());
+     Assert.assertEquals(0, this.afterReuseCallbacks.size());
+     Assert.assertEquals(1, this.beforeCloseCallbacks.size());
+     
+     LifecycleListener.removeLifecycleListener(this.listener);
+   }
+ 
+   @Test
+   public void testCallbacksAreCalledAfterReuse() {
+ 
+     LifecycleListener.addLifecycleListener(this.listener);
+ 
+     System.setProperty(SimpleMemoryAllocatorImpl.FREE_OFF_HEAP_MEMORY_PROPERTY, "false");
+ 
+     UnsafeMemoryChunk slab = new UnsafeMemoryChunk(1024); // 1k
+     SimpleMemoryAllocatorImpl ma = createAllocator(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[] { slab });
+ 
+     Assert.assertEquals(1, this.afterCreateCallbacks.size());
+     Assert.assertEquals(0, this.afterReuseCallbacks.size());
+     Assert.assertEquals(0, this.beforeCloseCallbacks.size());
+ 
+     ma.close();
+ 
+     Assert.assertEquals(1, this.afterCreateCallbacks.size());
+     Assert.assertEquals(0, this.afterReuseCallbacks.size());
+     Assert.assertEquals(1, this.beforeCloseCallbacks.size());
+ 
+     ma = createAllocator(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), null);
+ 
+     Assert.assertEquals(1, this.afterCreateCallbacks.size());
+     Assert.assertEquals(1, this.afterReuseCallbacks.size());
+     Assert.assertEquals(1, this.beforeCloseCallbacks.size());
+ 
+     SimpleMemoryAllocatorImpl ma2 = createAllocator(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[] { slab });
+     assertEquals(null, ma2);
+     
+     Assert.assertEquals(1, this.afterCreateCallbacks.size());
+     Assert.assertEquals(1, this.afterReuseCallbacks.size());
+     Assert.assertEquals(1, this.beforeCloseCallbacks.size());
+ 
+     ma.close();
+ 
+     Assert.assertEquals(1, this.afterCreateCallbacks.size());
+     Assert.assertEquals(1, this.afterReuseCallbacks.size());
+     Assert.assertEquals(2, this.beforeCloseCallbacks.size());
+   }
+ 
+   private SimpleMemoryAllocatorImpl createAllocator(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats ohms, UnsafeMemoryChunk[] slab) {
+     try {
 -       return SimpleMemoryAllocatorImpl.create(ooohml, ohms, slab);
++       return SimpleMemoryAllocatorImpl.createForUnitTest(ooohml, ohms, slab);
+     } catch (IllegalStateException e) {
+       return null;
+     }
+   }
+   
+   private void closeAndFree(SimpleMemoryAllocatorImpl ma) {
+     System.setProperty(SimpleMemoryAllocatorImpl.FREE_OFF_HEAP_MEMORY_PROPERTY, "true");
+     try {
+       ma.close();
+     } finally {
+       System.clearProperty(SimpleMemoryAllocatorImpl.FREE_OFF_HEAP_MEMORY_PROPERTY);
+     }
+   }
+   
+   @Test
+   public void testCallbacksAreCalledAfterReuseWithFreeTrue() {
+ 
+     LifecycleListener.addLifecycleListener(this.listener);
+ 
+     UnsafeMemoryChunk slab = new UnsafeMemoryChunk(1024); // 1k
 -    SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[] { slab });
++    SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[] { slab });
+ 
+     Assert.assertEquals(1, this.afterCreateCallbacks.size());
+     Assert.assertEquals(0, this.afterReuseCallbacks.size());
+     Assert.assertEquals(0, this.beforeCloseCallbacks.size());
+ 
+     closeAndFree(ma);
+ 
+     Assert.assertEquals(1, this.afterCreateCallbacks.size());
+     Assert.assertEquals(0, this.afterReuseCallbacks.size());
+     Assert.assertEquals(1, this.beforeCloseCallbacks.size());
+ 
+     slab = new UnsafeMemoryChunk(1024); // 1k
 -    SimpleMemoryAllocatorImpl ma2 = SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[] { slab });
++    SimpleMemoryAllocatorImpl ma2 = SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[] { slab });
+ 
+     Assert.assertEquals(2, this.afterCreateCallbacks.size());
+     Assert.assertEquals(0, this.afterReuseCallbacks.size());
+     Assert.assertEquals(1, this.beforeCloseCallbacks.size());
+ 
+     closeAndFree(ma);
+ 
+     Assert.assertEquals(2, this.afterCreateCallbacks.size());
+     Assert.assertEquals(0, this.afterReuseCallbacks.size());
+     Assert.assertEquals(2, this.beforeCloseCallbacks.size());
+   }
+ 
+   static final class LifecycleListenerCallback {
+     private final SimpleMemoryAllocatorImpl allocator;
+     private final long timeStamp;
+     private final Throwable creationTime;
+ 
+     LifecycleListenerCallback(SimpleMemoryAllocatorImpl allocator) {
+       this.allocator = allocator;
+       this.timeStamp = System.currentTimeMillis();
+       this.creationTime = new Exception();
+     }
+   }
+ 
+   static class TestLifecycleListener implements LifecycleListener {
+     private final List<LifecycleListenerCallback> afterCreateCallbacks;
+     private final List<LifecycleListenerCallback> afterReuseCallbacks;
+     private final List<LifecycleListenerCallback> beforeCloseCallbacks;
+ 
+     TestLifecycleListener(List<LifecycleListenerCallback> afterCreateCallbacks, List<LifecycleListenerCallback> afterReuseCallbacks,
+         List<LifecycleListenerCallback> beforeCloseCallbacks) {
+       this.afterCreateCallbacks = afterCreateCallbacks;
+       this.afterReuseCallbacks = afterReuseCallbacks;
+       this.beforeCloseCallbacks = beforeCloseCallbacks;
+     }
+ 
+     @Override
+     public void afterCreate(SimpleMemoryAllocatorImpl allocator) {
+       this.afterCreateCallbacks.add(new LifecycleListenerCallback(allocator));
+     }
+ 
+     @Override
+     public void afterReuse(SimpleMemoryAllocatorImpl allocator) {
+       this.afterReuseCallbacks.add(new LifecycleListenerCallback(allocator));
+     }
+ 
+     @Override
+     public void beforeClose(SimpleMemoryAllocatorImpl allocator) {
+       this.beforeCloseCallbacks.add(new LifecycleListenerCallback(allocator));
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/ObjectChunkJUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/ObjectChunkJUnitTest.java
index 0000000,0000000..9271b53
new file mode 100644
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/ObjectChunkJUnitTest.java
@@@ -1,0 -1,0 +1,902 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *      http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package com.gemstone.gemfire.internal.offheap;
++
++import static org.assertj.core.api.Assertions.assertThat;
++import static org.junit.Assert.assertArrayEquals;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertNotNull;
++import static org.mockito.Mockito.atLeastOnce;
++import static org.mockito.Mockito.doNothing;
++import static org.mockito.Mockito.doReturn;
++import static org.mockito.Mockito.mock;
++import static org.mockito.Mockito.spy;
++import static org.mockito.Mockito.times;
++import static org.mockito.Mockito.verify;
++import static org.mockito.Mockito.when;
++
++import java.io.IOException;
++import java.nio.ByteBuffer;
++
++import org.junit.After;
++import org.junit.Before;
++import org.junit.Test;
++import org.junit.experimental.categories.Category;
++
++import com.gemstone.gemfire.LogWriter;
++import com.gemstone.gemfire.compression.Compressor;
++import com.gemstone.gemfire.internal.DSCODE;
++import com.gemstone.gemfire.internal.HeapDataOutputStream;
++import com.gemstone.gemfire.internal.Version;
++import com.gemstone.gemfire.internal.cache.BytesAndBitsForCompactor;
++import com.gemstone.gemfire.internal.cache.CachePerfStats;
++import com.gemstone.gemfire.internal.cache.EntryEventImpl;
++import com.gemstone.gemfire.internal.cache.RegionEntryContext;
++import com.gemstone.gemfire.internal.offheap.MemoryBlock.State;
++import com.gemstone.gemfire.test.junit.categories.UnitTest;
++
++@Category(UnitTest.class)
++public class ObjectChunkJUnitTest extends AbstractStoredObjectTestBase {
++
++  private MemoryAllocator ma;
++
++  static {
++    ClassLoader.getSystemClassLoader().setDefaultAssertionStatus(true);
++  }
++
++  @Before
++  public void setUp() {
++    OutOfOffHeapMemoryListener ooohml = mock(OutOfOffHeapMemoryListener.class);
++    OffHeapMemoryStats stats = mock(OffHeapMemoryStats.class);
++    LogWriter lw = mock(LogWriter.class);
++
++    ma = SimpleMemoryAllocatorImpl.create(ooohml, stats, lw, 3, OffHeapStorage.MIN_SLAB_SIZE * 3, OffHeapStorage.MIN_SLAB_SIZE);
++  }
++
++  @After
++  public void tearDown() {
++    SimpleMemoryAllocatorImpl.freeOffHeapMemory();
++  }
++
++  @Override
++  public Object getValue() {
++    return Long.valueOf(Long.MAX_VALUE);
++  }
++
++  @Override
++  public byte[] getValueAsByteArray() {
++    return convertValueToByteArray(getValue());
++  }
++
++  private byte[] convertValueToByteArray(Object value) {
++    return ByteBuffer.allocate(Long.SIZE / Byte.SIZE).putLong((Long) value).array();
++  }
++
++  @Override
++  public Object convertByteArrayToObject(byte[] valueInByteArray) {
++    return ByteBuffer.wrap(valueInByteArray).getLong();
++  }
++
++  @Override
++  public Object convertSerializedByteArrayToObject(byte[] valueInSerializedByteArray) {
++    return EntryEventImpl.deserialize(valueInSerializedByteArray);
++  }
++
++  @Override
++  public ObjectChunk createValueAsUnserializedStoredObject(Object value) {
++    byte[] valueInByteArray;
++    if (value instanceof Long) {
++      valueInByteArray = convertValueToByteArray(value);
++    } else {
++      valueInByteArray = (byte[]) value;
++    }
++
++    boolean isSerialized = false;
++    boolean isCompressed = false;
++
++    return createChunk(valueInByteArray, isSerialized, isCompressed);
++  }
++
++  @Override
++  public ObjectChunk createValueAsSerializedStoredObject(Object value) {
++    byte[] valueInSerializedByteArray = EntryEventImpl.serialize(value);
++
++    boolean isSerialized = true;
++    boolean isCompressed = false;
++
++    return createChunk(valueInSerializedByteArray, isSerialized, isCompressed);
++  }
++
++  private ObjectChunk createChunk(byte[] v, boolean isSerialized, boolean isCompressed) {
++    ObjectChunk chunk = (ObjectChunk) ma.allocateAndInitialize(v, isSerialized, isCompressed);
++    return chunk;
++  }
++
++  @Test
++  public void chunkCanBeCreatedFromAnotherChunk() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++
++    ObjectChunk newChunk = new ObjectChunk(chunk);
++
++    assertNotNull(newChunk);
++    assertThat(newChunk.getMemoryAddress()).isEqualTo(chunk.getMemoryAddress());
++
++    chunk.release();
++  }
++
++  @Test
++  public void chunkCanBeCreatedWithOnlyMemoryAddress() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++
++    ObjectChunk newChunk = new ObjectChunk(chunk.getMemoryAddress());
++
++    assertNotNull(newChunk);
++    assertThat(newChunk.getMemoryAddress()).isEqualTo(chunk.getMemoryAddress());
++
++    chunk.release();
++  }
++
++  @Test
++  public void chunkSliceCanBeCreatedFromAnotherChunk() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++
++    int position = 1;
++    int end = 2;
++
++    ObjectChunk newChunk = (ObjectChunk) chunk.slice(position, end);
++
++    assertNotNull(newChunk);
++    assertThat(newChunk.getClass()).isEqualTo(ObjectChunkSlice.class);
++    assertThat(newChunk.getMemoryAddress()).isEqualTo(chunk.getMemoryAddress());
++
++    chunk.release();
++  }
++
++  @Test
++  public void fillSerializedValueShouldFillWrapperWithSerializedValueIfValueIsSerialized() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++
++    // mock the things
++    BytesAndBitsForCompactor wrapper = mock(BytesAndBitsForCompactor.class);
++
++    byte userBits = 0;
++    byte serializedUserBits = 1;
++    chunk.fillSerializedValue(wrapper, userBits);
++
++    verify(wrapper, times(1)).setChunkData(chunk, serializedUserBits);
++
++    chunk.release();
++  }
++
++  @Test
++  public void fillSerializedValueShouldFillWrapperWithDeserializedValueIfValueIsNotSerialized() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++
++    // mock the things
++    BytesAndBitsForCompactor wrapper = mock(BytesAndBitsForCompactor.class);
++
++    byte userBits = 1;
++    chunk.fillSerializedValue(wrapper, userBits);
++
++    verify(wrapper, times(1)).setChunkData(chunk, userBits);
++
++    chunk.release();
++  }
++
++  @Test
++  public void getShortClassNameShouldReturnShortClassName() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    assertThat(chunk.getShortClassName()).isEqualTo("ObjectChunk");
++
++    chunk.release();
++  }
++
++  @Test
++  public void chunksAreEqualsOnlyByAddress() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++
++    ObjectChunk newChunk = new ObjectChunk(chunk.getMemoryAddress());
++    assertThat(chunk.equals(newChunk)).isTrue();
++
++    ObjectChunk chunkWithSameValue = createValueAsUnserializedStoredObject(getValue());
++    assertThat(chunk.equals(chunkWithSameValue)).isFalse();
++
++    Object someObject = getValue();
++    assertThat(chunk.equals(someObject)).isFalse();
++
++    chunk.release();
++    chunkWithSameValue.release();
++  }
++
++  @Test
++  public void chunksShouldBeComparedBySize() {
++    ObjectChunk chunk1 = createValueAsSerializedStoredObject(getValue());
++
++    ObjectChunk chunk2 = chunk1;
++    assertThat(chunk1.compareTo(chunk2)).isEqualTo(0);
++
++    ObjectChunk chunkWithSameValue = createValueAsSerializedStoredObject(getValue());
++    assertThat(chunk1.compareTo(chunkWithSameValue)).isEqualTo(Long.signum(chunk1.getMemoryAddress() - chunkWithSameValue.getMemoryAddress()));
++
++    ObjectChunk chunk3 = createValueAsSerializedStoredObject(Long.MAX_VALUE);
++    ObjectChunk chunk4 = createValueAsSerializedStoredObject(Long.MAX_VALUE);
++
++    int newSizeForChunk3 = 2;
++    int newSizeForChunk4 = 3;
++
++    assertThat(chunk3.compareTo(chunk4)).isEqualTo(Integer.signum(newSizeForChunk3 - newSizeForChunk4));
++
++    chunk1.release();
++    chunk4.release();
++  }
++
++  @Test
++  public void setSerializedShouldSetTheSerializedBit() {
++    Object regionEntryValue = getValue();
++    byte[] regionEntryValueAsBytes = convertValueToByteArray(regionEntryValue);
++
++    boolean isSerialized = false;
++    boolean isCompressed = false;
++
++    ObjectChunk chunk = (ObjectChunk) ma.allocateAndInitialize(regionEntryValueAsBytes, isSerialized, isCompressed);
++
++    int headerBeforeSerializedBitSet = UnsafeMemoryChunk.readAbsoluteIntVolatile(chunk.getMemoryAddress() + ObjectChunk.REF_COUNT_OFFSET);
++
++    assertThat(chunk.isSerialized()).isFalse();
++
++    chunk.setSerialized(true); // set to true
++
++    assertThat(chunk.isSerialized()).isTrue();
++
++    int headerAfterSerializedBitSet = UnsafeMemoryChunk.readAbsoluteIntVolatile(chunk.getMemoryAddress() + ObjectChunk.REF_COUNT_OFFSET);
++
++    assertThat(headerAfterSerializedBitSet).isEqualTo(headerBeforeSerializedBitSet | ObjectChunk.IS_SERIALIZED_BIT);
++
++    chunk.release();
++  }
++
++  @Test(expected = IllegalStateException.class)
++  public void setSerialziedShouldThrowExceptionIfChunkIsAlreadyReleased() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    chunk.release();
++    chunk.setSerialized(true);
++
++    chunk.release();
++  }
++
++  @Test
++  public void setCompressedShouldSetTheCompressedBit() {
++    Object regionEntryValue = getValue();
++    byte[] regionEntryValueAsBytes = convertValueToByteArray(regionEntryValue);
++
++    boolean isSerialized = false;
++    boolean isCompressed = false;
++
++    ObjectChunk chunk = (ObjectChunk) ma.allocateAndInitialize(regionEntryValueAsBytes, isSerialized, isCompressed);
++
++    int headerBeforeCompressedBitSet = UnsafeMemoryChunk.readAbsoluteIntVolatile(chunk.getMemoryAddress() + ObjectChunk.REF_COUNT_OFFSET);
++
++    assertThat(chunk.isCompressed()).isFalse();
++
++    chunk.setCompressed(true); // set to true
++
++    assertThat(chunk.isCompressed()).isTrue();
++
++    int headerAfterCompressedBitSet = UnsafeMemoryChunk.readAbsoluteIntVolatile(chunk.getMemoryAddress() + ObjectChunk.REF_COUNT_OFFSET);
++
++    assertThat(headerAfterCompressedBitSet).isEqualTo(headerBeforeCompressedBitSet | ObjectChunk.IS_COMPRESSED_BIT);
++
++    chunk.release();
++  }
++
++  @Test(expected = IllegalStateException.class)
++  public void setCompressedShouldThrowExceptionIfChunkIsAlreadyReleased() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    chunk.release();
++    chunk.setCompressed(true);
++
++    chunk.release();
++  }
++
++  @Test
++  public void setDataSizeShouldSetTheDataSizeBits() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++
++    int beforeSize = chunk.getDataSize();
++
++    chunk.setDataSize(2);
++
++    int afterSize = chunk.getDataSize();
++
++    assertThat(afterSize).isEqualTo(2);
++    assertThat(afterSize).isNotEqualTo(beforeSize);
++
++    chunk.release();
++  }
++
++  @Test(expected = IllegalStateException.class)
++  public void setDataSizeShouldThrowExceptionIfChunkIsAlreadyReleased() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    chunk.release();
++    chunk.setDataSize(1);
++
++    chunk.release();
++  }
++
++  @Test(expected = IllegalStateException.class)
++  public void initializeUseCountShouldThrowIllegalStateExceptionIfChunkIsAlreadyRetained() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    chunk.retain();
++    chunk.initializeUseCount();
++
++    chunk.release();
++  }
++
++  @Test(expected = IllegalStateException.class)
++  public void initializeUseCountShouldThrowIllegalStateExceptionIfChunkIsAlreadyReleased() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    chunk.release();
++    chunk.initializeUseCount();
++
++    chunk.release();
++  }
++
++  @Test
++  public void isSerializedPdxInstanceShouldReturnTrueIfItsPDXInstance() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++
++    byte[] serailizedValue = chunk.getSerializedValue();
++    serailizedValue[0] = DSCODE.PDX;
++    chunk.setSerializedValue(serailizedValue);
++
++    assertThat(chunk.isSerializedPdxInstance()).isTrue();
++
++    serailizedValue = chunk.getSerializedValue();
++    serailizedValue[0] = DSCODE.PDX_ENUM;
++    chunk.setSerializedValue(serailizedValue);
++
++    assertThat(chunk.isSerializedPdxInstance()).isTrue();
++
++    serailizedValue = chunk.getSerializedValue();
++    serailizedValue[0] = DSCODE.PDX_INLINE_ENUM;
++    chunk.setSerializedValue(serailizedValue);
++
++    assertThat(chunk.isSerializedPdxInstance()).isTrue();
++
++    chunk.release();
++  }
++
++  @Test
++  public void isSerializedPdxInstanceShouldReturnFalseIfItsNotPDXInstance() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++    assertThat(chunk.isSerializedPdxInstance()).isFalse();
++
++    chunk.release();
++  }
++
++  @Test
++  public void checkDataEqualsByChunk() {
++    ObjectChunk chunk1 = createValueAsSerializedStoredObject(getValue());
++    ObjectChunk sameAsChunk1 = chunk1;
++
++    assertThat(chunk1.checkDataEquals(sameAsChunk1)).isTrue();
++
++    ObjectChunk unserializedChunk = createValueAsUnserializedStoredObject(getValue());
++    assertThat(chunk1.checkDataEquals(unserializedChunk)).isFalse();
++
++    ObjectChunk chunkDifferBySize = createValueAsSerializedStoredObject(getValue());
++    chunkDifferBySize.setSize(0);
++    assertThat(chunk1.checkDataEquals(chunkDifferBySize)).isFalse();
++
++    ObjectChunk chunkDifferByValue = createValueAsSerializedStoredObject(Long.MAX_VALUE - 1);
++    assertThat(chunk1.checkDataEquals(chunkDifferByValue)).isFalse();
++
++    ObjectChunk newChunk1 = createValueAsSerializedStoredObject(getValue());
++    assertThat(chunk1.checkDataEquals(newChunk1)).isTrue();
++
++    chunk1.release();
++    unserializedChunk.release();
++    chunkDifferBySize.release();
++    chunkDifferByValue.release();
++    newChunk1.release();
++  }
++
++  @Test
++  public void checkDataEqualsBySerializedValue() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++    assertThat(chunk.checkDataEquals(new byte[1])).isFalse();
++
++    ObjectChunk chunkDifferByValue = createValueAsSerializedStoredObject(Long.MAX_VALUE - 1);
++    assertThat(chunk.checkDataEquals(chunkDifferByValue.getSerializedValue())).isFalse();
++
++    ObjectChunk newChunk = createValueAsSerializedStoredObject(getValue());
++    assertThat(chunk.checkDataEquals(newChunk.getSerializedValue())).isTrue();
++
++    chunk.release();
++    chunkDifferByValue.release();
++    newChunk.release();
++  }
++
++  @Test
++  public void getDecompressedBytesShouldReturnDecompressedBytesIfCompressed() {
++    Object regionEntryValue = getValue();
++    byte[] regionEntryValueAsBytes = convertValueToByteArray(regionEntryValue);
++
++    boolean isSerialized = true;
++    boolean isCompressed = true;
++
++    ObjectChunk chunk = (ObjectChunk) ma.allocateAndInitialize(regionEntryValueAsBytes, isSerialized, isCompressed);
++
++    RegionEntryContext regionContext = mock(RegionEntryContext.class);
++    CachePerfStats cacheStats = mock(CachePerfStats.class);
++    Compressor compressor = mock(Compressor.class);
++
++    long startTime = 10000L;
++
++    // mock required things
++    when(regionContext.getCompressor()).thenReturn(compressor);
++    when(compressor.decompress(regionEntryValueAsBytes)).thenReturn(regionEntryValueAsBytes);
++    when(regionContext.getCachePerfStats()).thenReturn(cacheStats);
++    when(cacheStats.startDecompression()).thenReturn(startTime);
++
++    // invoke the thing
++    byte[] bytes = chunk.getDecompressedBytes(regionContext);
++
++    // verify the thing happened
++    verify(cacheStats, atLeastOnce()).startDecompression();
++    verify(compressor, times(1)).decompress(regionEntryValueAsBytes);
++    verify(cacheStats, atLeastOnce()).endDecompression(startTime);
++
++    assertArrayEquals(regionEntryValueAsBytes, bytes);
++
++    chunk.release();
++  }
++
++  @Test
++  public void incSizeShouldIncrementSize() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++
++    int beforeSize = chunk.getSize();
++
++    chunk.incSize(1);
++    assertThat(chunk.getSize()).isEqualTo(beforeSize + 1);
++
++    chunk.incSize(2);
++    assertThat(chunk.getSize()).isEqualTo(beforeSize + 1 + 2);
++
++    chunk.release();
++  }
++
++  @Test
++  public void readyForFreeShouldResetTheRefCount() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++
++    int refCountBeforeFreeing = chunk.getRefCount();
++    assertThat(refCountBeforeFreeing).isEqualTo(1);
++
++    chunk.readyForFree();
++
++    int refCountAfterFreeing = chunk.getRefCount();
++    assertThat(refCountAfterFreeing).isEqualTo(0);
++  }
++
++  @Test(expected = IllegalStateException.class)
++  public void readyForAllocationShouldThrowExceptionIfAlreadyAllocated() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++
++    // chunk is already allocated when we created it, so calling readyForAllocation should throw exception.
++    chunk.readyForAllocation();
++
++    chunk.release();
++  }
++
++  @Test
++  public void checkIsAllocatedShouldReturnIfAllocated() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++    chunk.checkIsAllocated();
++
++    chunk.release();
++  }
++
++  @Test(expected = IllegalStateException.class)
++  public void checkIsAllocatedShouldThrowExceptionIfNotAllocated() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++    chunk.release();
++    chunk.checkIsAllocated();
++
++    chunk.release();
++  }
++
++  @Test
++  public void sendToShouldWriteSerializedValueToDataOutputIfValueIsSerialized() throws IOException {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++    ObjectChunk spyChunk = spy(chunk);
++
++    HeapDataOutputStream dataOutput = mock(HeapDataOutputStream.class);
++    ByteBuffer directByteBuffer = ByteBuffer.allocate(1024);
++
++    doReturn(directByteBuffer).when(spyChunk).createDirectByteBuffer();
++    doNothing().when(dataOutput).write(directByteBuffer);
++
++    spyChunk.sendTo(dataOutput);
++
++    verify(dataOutput, times(1)).write(directByteBuffer);
++
++    chunk.release();
++  }
++
++  @Test
++  public void sendToShouldWriteUnserializedValueToDataOutputIfValueIsUnserialized() throws IOException {
++    byte[] regionEntryValue = getValueAsByteArray();
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(regionEntryValue);
++
++    // writeByte is a final method and cannot be mocked, so creating a real one
++    HeapDataOutputStream dataOutput = new HeapDataOutputStream(Version.CURRENT);
++
++    chunk.sendTo(dataOutput);
++
++    byte[] actual = dataOutput.toByteArray();
++
++    byte[] expected = new byte[regionEntryValue.length + 2];
++    expected[0] = DSCODE.BYTE_ARRAY;
++    expected[1] = (byte) regionEntryValue.length;
++    System.arraycopy(regionEntryValue, 0, expected, 2, regionEntryValue.length);
++
++    assertNotNull(dataOutput);
++    assertThat(actual).isEqualTo(expected);
++
++    chunk.release();
++  }
++
++  @Test
++  public void sendAsByteArrayShouldWriteValueToDataOutput() throws IOException {
++    byte[] regionEntryValue = getValueAsByteArray();
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(regionEntryValue);
++
++    // writeByte is a final method and cannot be mocked, so creating a real one
++    HeapDataOutputStream dataOutput = new HeapDataOutputStream(Version.CURRENT);
++
++    chunk.sendAsByteArray(dataOutput);
++
++    byte[] actual = dataOutput.toByteArray();
++
++    byte[] expected = new byte[regionEntryValue.length + 1];
++    expected[0] = (byte) regionEntryValue.length;
++    System.arraycopy(regionEntryValue, 0, expected, 1, regionEntryValue.length);
++
++    assertNotNull(dataOutput);
++    assertThat(actual).isEqualTo(expected);
++
++    chunk.release();
++  }
++
++  @Test
++  public void createDirectByteBufferShouldCreateAByteBuffer() {
++    byte[] regionEntryValue = getValueAsByteArray();
++
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(regionEntryValue);
++
++    ByteBuffer buffer = chunk.createDirectByteBuffer();
++
++    byte[] actual = new byte[regionEntryValue.length];
++    buffer.get(actual);
++
++    assertArrayEquals(regionEntryValue, actual);
++
++    chunk.release();
++  }
++
++  @Test
++  public void getDirectByteBufferShouldCreateAByteBuffer() {
++    byte[] regionEntryValue = getValueAsByteArray();
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(regionEntryValue);
++
++    ByteBuffer buffer = chunk.createDirectByteBuffer();
++    long bufferAddress = ObjectChunk.getDirectByteBufferAddress(buffer);
++
++    // returned address should be starting of the value (after skipping HEADER_SIZE bytes)
++    assertEquals(chunk.getMemoryAddress() + ObjectChunk.OFF_HEAP_HEADER_SIZE, bufferAddress);
++
++    chunk.release();
++  }
++
++  @Test(expected = AssertionError.class)
++  public void getAddressForReadingShouldFailIfItsOutsideOfChunk() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++    chunk.getAddressForReading(0, chunk.getDataSize() + 1);
++
++    chunk.release();
++  }
++
++  @Test
++  public void getAddressForReadingShouldReturnDataAddressFromGivenOffset() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++
++    int offset = 1;
++    long requestedAddress = chunk.getAddressForReading(offset, 1);
++
++    assertThat(requestedAddress).isEqualTo(chunk.getBaseDataAddress() + offset);
++
++    chunk.release();
++  }
++
++  @Test
++  public void getSizeInBytesShouldReturnSize() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++    assertThat(chunk.getSizeInBytes()).isEqualTo(chunk.getSize());
++
++    chunk.release();
++  }
++
++  @Test(expected = AssertionError.class)
++  public void getUnsafeAddressShouldFailIfOffsetIsNegative() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++    chunk.getUnsafeAddress(-1, 1);
++
++    chunk.release();
++  }
++
++  @Test(expected = AssertionError.class)
++  public void getUnsafeAddressShouldFailIfSizeIsNegative() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++    chunk.getUnsafeAddress(1, -1);
++
++    chunk.release();
++  }
++
++  @Test(expected = AssertionError.class)
++  public void getUnsafeAddressShouldFailIfItsOutsideOfChunk() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++    chunk.getUnsafeAddress(0, chunk.getDataSize() + 1);
++
++    chunk.release();
++  }
++
++  @Test
++  public void getUnsafeAddressShouldReturnUnsafeAddress() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++
++    int offset = 1;
++    long unsafeAddress = chunk.getUnsafeAddress(offset, 1);
++
++    assertThat(unsafeAddress).isEqualTo(chunk.getBaseDataAddress() + offset);
++
++    chunk.release();
++  }
++
++  @Test(expected = AssertionError.class)
++  public void readByteAndWriteByteShouldFailIfOffsetIsOutside() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++
++    chunk.readByte(chunk.getDataSize() + 1);
++
++    chunk.writeByte(chunk.getDataSize() + 1, Byte.MAX_VALUE);
++
++    chunk.release();
++  }
++
++  @Test
++  public void writeByteShouldWriteAtCorrectLocation() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++
++    byte valueBeforeWrite = chunk.readByte(2);
++
++    Byte expected = Byte.MAX_VALUE;
++    chunk.writeByte(2, expected);
++
++    Byte actual = chunk.readByte(2);
++
++    assertThat(actual).isNotEqualTo(valueBeforeWrite);
++    assertThat(actual).isEqualTo(expected);
++
++    chunk.release();
++  }
++
++  @Test
++  public void retainShouldIncrementRefCount() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    assertThat(chunk.getRefCount()).isEqualTo(1);
++
++    chunk.retain();
++    assertThat(chunk.getRefCount()).isEqualTo(2);
++
++    chunk.retain();
++    assertThat(chunk.getRefCount()).isEqualTo(3);
++
++    chunk.release();
++    chunk.release();
++    chunk.release();
++    boolean retainAfterRelease = chunk.retain();
++
++    assertThat(retainAfterRelease).isFalse();
++  }
++
++  @Test(expected = IllegalStateException.class)
++  public void retainShouldThrowExceptionAfterMaxNumberOfTimesRetained() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++
++    // loop though and invoke retain for MAX_REF_COUNT-1 times, as create chunk above counted as one reference
++    for (int i = 0; i < ObjectChunk.MAX_REF_COUNT - 1; i++)
++      chunk.retain();
++
++    // invoke for the one more time should throw exception
++    chunk.retain();
++  }
++
++  @Test
++  public void releaseShouldDecrementRefCount() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    assertThat(chunk.getRefCount()).isEqualTo(1);
++
++    chunk.retain();
++    chunk.retain();
++    assertThat(chunk.getRefCount()).isEqualTo(3);
++
++    chunk.release();
++    assertThat(chunk.getRefCount()).isEqualTo(2);
++
++    chunk.release();
++    assertThat(chunk.getRefCount()).isEqualTo(1);
++
++    chunk.retain();
++    chunk.release();
++    assertThat(chunk.getRefCount()).isEqualTo(1);
++
++    chunk.release();
++    assertThat(chunk.getRefCount()).isEqualTo(0);
++  }
++
++  @Test(expected = IllegalStateException.class)
++  public void releaseShouldThrowExceptionIfChunkIsAlreadyReleased() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    chunk.release();
++    chunk.release();
++  }
++
++  @Test
++  public void testToStringForOffHeapByteSource() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++
++    String expected = ":<dataSize=" + chunk.getDataSize() + " refCount=" + chunk.getRefCount() + " addr=" + Long.toHexString(chunk.getMemoryAddress()) + ">";
++    assertThat(chunk.toStringForOffHeapByteSource()).endsWith(expected);
++
++    // test toString
++    ObjectChunk spy = spy(chunk);
++    spy.toString();
++    verify(spy, times(1)).toStringForOffHeapByteSource();
++
++    chunk.release();
++  }
++
++  @Test
++  public void getStateShouldReturnAllocatedIfRefCountIsGreaterThanZero() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    assertEquals(State.ALLOCATED, chunk.getState());
++
++    chunk.release();
++  }
++
++  @Test
++  public void getStateShouldReturnDeallocatedIfRefCountIsZero() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    chunk.release();
++    assertEquals(State.DEALLOCATED, chunk.getState());
++  }
++
++  @Test(expected = UnsupportedOperationException.class)
++  public void getNextBlockShouldThrowUnSupportedOperationException() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    chunk.getNextBlock();
++
++    chunk.release();
++  }
++
++  @Test
++  public void getBlockSizeShouldBeSameSameGetSize() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    assertEquals(chunk.getSize(), chunk.getBlockSize());
++
++    chunk.release();
++  }
++
++  @Test(expected = UnsupportedOperationException.class)
++  public void copyBytesShouldThrowUnSupportedOperationException() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    chunk.copyBytes(1, 2, 1);
++
++    chunk.release();
++  }
++
++  @Test(expected = UnsupportedOperationException.class)
++  public void getSlabIdShouldThrowUnSupportedOperationException() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    chunk.getSlabId();
++
++    chunk.release();
++  }
++
++  @Test
++  public void getFreeListIdShouldReturnMinusOne() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    assertThat(chunk.getFreeListId()).isEqualTo(-1);
++
++    chunk.release();
++  }
++
++  @Test
++  public void getDataTypeShouldReturnNull() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    assertThat(chunk.getDataType()).isNull();
++
++    chunk.release();
++  }
++
++  @Test
++  public void getDataDataShouldReturnNull() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    assertThat(chunk.getDataValue()).isNull();
++  }
++
++  @Test(expected = UnsupportedOperationException.class)
++  public void getRawBytesShouldThrowExceptionIfValueIsCompressed() {
++    Object regionEntryValue = getValue();
++    byte[] regionEntryValueAsBytes = convertValueToByteArray(regionEntryValue);
++
++    boolean isSerialized = true;
++    boolean isCompressed = true;
++
++    ObjectChunk chunk = (ObjectChunk) ma.allocateAndInitialize(regionEntryValueAsBytes, isSerialized, isCompressed);
++
++    chunk.getRawBytes();
++
++    chunk.release();
++  }
++
++  @Test
++  public void getSerializedValueShouldSerializeTheValue() {
++    Object regionEntryValue = getValue();
++    byte[] regionEntryValueAsBytes = convertValueToByteArray(regionEntryValue);
++
++    boolean isSerialized = false;
++    boolean isCompressed = false;
++
++    ObjectChunk chunk = (ObjectChunk) ma.allocateAndInitialize(regionEntryValueAsBytes, isSerialized, isCompressed);
++
++    byte[] serializedValue = chunk.getSerializedValue();
++
++    assertThat(serializedValue).isEqualTo(EntryEventImpl.serialize(regionEntryValueAsBytes));
++
++    chunk.release();
++  }
++
++  @Test
++  public void fillShouldFillTheChunk() {
++    boolean isSerialized = false;
++    boolean isCompressed = false;
++
++    ObjectChunk chunk = (ObjectChunk) ma.allocateAndInitialize(new byte[100], isSerialized, isCompressed);
++
++    // first fill the unused part with FILL_PATTERN
++    ObjectChunk.fill(chunk.getMemoryAddress());
++
++    // Validate that it is filled
++    chunk.validateFill();
++
++    chunk.release();
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/ObjectChunkSliceJUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/ObjectChunkSliceJUnitTest.java
index 0000000,0000000..fe55910
new file mode 100644
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/ObjectChunkSliceJUnitTest.java
@@@ -1,0 -1,0 +1,72 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *      http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package com.gemstone.gemfire.internal.offheap;
++
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertNotNull;
++
++import org.junit.Test;
++import org.junit.experimental.categories.Category;
++
++import com.gemstone.gemfire.test.junit.categories.UnitTest;
++
++@Category(UnitTest.class)
++public class ObjectChunkSliceJUnitTest extends ObjectChunkJUnitTest {
++
++  @Test
++  public void sliceShouldHaveAValidDataSize() {
++    int position = 1;
++    int end = 2;
++
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    ObjectChunkSlice slice = (ObjectChunkSlice) chunk.slice(position, end);
++
++    assertNotNull(slice);
++    assertEquals(ObjectChunkSlice.class, slice.getClass());
++
++    assertEquals(end - position, slice.getDataSize());
++  }
++
++  @Test
++  public void sliceShouldHaveAValidBaseDataAddress() {
++    int position = 1;
++    int end = 2;
++
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    ObjectChunkSlice slice = (ObjectChunkSlice) chunk.slice(position, end);
++
++    assertNotNull(slice);
++    assertEquals(ObjectChunkSlice.class, slice.getClass());
++
++    assertEquals(chunk.getBaseDataAddress() + position, slice.getBaseDataAddress());
++  }
++
++  @Test
++  public void sliceShouldHaveAValidBaseOffset() {
++    int position = 1;
++    int end = 2;
++
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++    ObjectChunkSlice slice = (ObjectChunkSlice) chunk.slice(position, end);
++
++    assertNotNull(slice);
++    assertEquals(ObjectChunkSlice.class, slice.getClass());
++
++    assertEquals(chunk.getBaseDataOffset() + position, slice.getBaseDataOffset());
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/ObjectChunkWithHeapFormJUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/ObjectChunkWithHeapFormJUnitTest.java
index 0000000,0000000..4486845
new file mode 100644
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/ObjectChunkWithHeapFormJUnitTest.java
@@@ -1,0 -1,0 +1,64 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *      http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package com.gemstone.gemfire.internal.offheap;
++
++import static org.junit.Assert.assertArrayEquals;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertNotNull;
++import static org.junit.Assert.assertNotSame;
++import static org.junit.Assert.assertSame;
++
++import org.junit.Test;
++import org.junit.experimental.categories.Category;
++
++import com.gemstone.gemfire.test.junit.categories.UnitTest;
++
++@Category(UnitTest.class)
++public class ObjectChunkWithHeapFormJUnitTest extends ObjectChunkJUnitTest {
++
++  @Test
++  public void getRawBytesShouldReturnCachedHeapForm() {
++    ObjectChunk chunk = createValueAsUnserializedStoredObject(getValue());
++
++    byte[] valueInBytes = getValueAsByteArray();
++    ObjectChunkWithHeapForm heapForm = new ObjectChunkWithHeapForm(chunk, valueInBytes);
++
++    assertNotNull(heapForm);
++
++    assertSame(valueInBytes, heapForm.getRawBytes());
++  }
++
++  @Test
++  public void getChunkWithoutHeapFormShouldReturnGemFireChunk() {
++    ObjectChunk chunk = createValueAsSerializedStoredObject(getValue());
++
++    byte[] valueInBytes = getValueAsByteArray();
++    ObjectChunkWithHeapForm heapForm = new ObjectChunkWithHeapForm(chunk, valueInBytes);
++
++    ObjectChunk chunkWithOutHeapForm = heapForm.getChunkWithoutHeapForm();
++
++    assertNotNull(chunkWithOutHeapForm);
++    assertEquals(ObjectChunk.class, chunkWithOutHeapForm.getClass());
++
++    assertEquals(chunk, heapForm.getChunkWithoutHeapForm());
++
++    assertEquals(chunk.getMemoryAddress(), chunkWithOutHeapForm.getMemoryAddress());
++    assertArrayEquals(chunk.getRawBytes(), chunkWithOutHeapForm.getRawBytes());
++    assertNotSame(valueInBytes, chunkWithOutHeapForm.getRawBytes());
++  }
++}