You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/02/23 21:23:56 UTC

[60/94] [abbrv] incubator-geode git commit: GEODE-917: Merge branch 'feature/GEODE-917' into develop

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapValidationJUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapValidationJUnitTest.java
index 0000000,2d86296..630ae22
mode 000000,100755..100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapValidationJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapValidationJUnitTest.java
@@@ -1,0 -1,540 +1,540 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertNotNull;
+ import static org.junit.Assert.assertNull;
+ import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
+ 
+ import java.io.IOException;
+ import java.io.Serializable;
+ import java.sql.Timestamp;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.Date;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Hashtable;
+ import java.util.IdentityHashMap;
+ import java.util.LinkedHashSet;
+ import java.util.LinkedList;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Properties;
+ import java.util.Stack;
+ import java.util.TreeMap;
+ import java.util.TreeSet;
+ import java.util.UUID;
+ import java.util.Vector;
+ 
+ import org.junit.After;
+ import org.junit.Before;
+ import org.junit.Test;
+ import org.junit.experimental.categories.Category;
+ 
+ import com.gemstone.gemfire.DataSerializer;
+ import com.gemstone.gemfire.cache.CacheFactory;
+ import com.gemstone.gemfire.cache.Region;
+ import com.gemstone.gemfire.cache.RegionShortcut;
+ import com.gemstone.gemfire.compression.SnappyCompressor;
+ import com.gemstone.gemfire.internal.HeapDataOutputStream;
+ import com.gemstone.gemfire.internal.Version;
+ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+ import com.gemstone.gemfire.internal.cache.LocalRegion;
+ import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+ 
+ /**
+  * Basic integration tests for validating the off-heap implementation. 
+  * 
+  * @author Kirk Lund
+  */
+ @Category(IntegrationTest.class)
+ public class OffHeapValidationJUnitTest {
+ 
+   private GemFireCacheImpl cache;
+   
+   @Before
+   public void setUp() throws Exception {
+     this.cache = createCache();
+   }
+ 
+   @After
+   public void tearDown() throws Exception {
+     closeCache(this.cache);
+   }
+ 
+   protected GemFireCacheImpl createCache() {
+     Properties props = new Properties();
+     props.setProperty("locators", "");
+     props.setProperty("mcast-port", "0");
+     props.setProperty("off-heap-memory-size", getOffHeapMemorySize());
+     GemFireCacheImpl result = (GemFireCacheImpl) new CacheFactory(props).create();
+     return result;
+   }
+   
+   protected void closeCache(GemFireCacheImpl gfc) {
+     gfc.close();
+   }
+   
+   protected String getOffHeapMemorySize() {
+     return "2m";
+   }
+   
+   protected RegionShortcut getRegionShortcut() {
+     return RegionShortcut.REPLICATE;
+   }
+   
+   protected String getRegionName() {
+     return "region1";
+   }
+   
+   @Test
+   public void testMemoryInspection() throws IOException {
+     // validate initial state
+     MemoryAllocator allocator = this.cache.getOffHeapStore();
+     assertNotNull(allocator);
+     MemoryInspector inspector = allocator.getMemoryInspector();
+     assertNotNull(inspector);
+     inspector.createSnapshot();
+     try {
+       MemoryBlock firstBlock = inspector.getFirstBlock();
+       assertNotNull(firstBlock);
+       assertEquals(1024*1024*2, firstBlock.getBlockSize());
+       assertEquals("N/A", firstBlock.getDataType());
+       assertEquals(-1, firstBlock.getFreeListId());
+       assertTrue(firstBlock.getMemoryAddress() > 0);
+       assertNull(firstBlock.getNextBlock());
+       assertEquals(0, firstBlock.getRefCount());
+       assertEquals(0, firstBlock.getSlabId());
+       assertEquals(MemoryBlock.State.UNUSED, firstBlock.getState());
+       assertFalse(firstBlock.isCompressed());
+       assertFalse(firstBlock.isSerialized());
+     } finally {
+       inspector.clearSnapshot();
+     }
+     
+     // create off-heap region
+     Region<Object, Object> region = this.cache.createRegionFactory(getRegionShortcut()).setOffHeap(true).create(getRegionName());
+     Region<Object, Object> compressedRegion = this.cache.createRegionFactory(getRegionShortcut()).setOffHeap(true).setCompressor(SnappyCompressor.getDefaultInstance()).create(getRegionName()+"Compressed");
+     
+     // perform some ops
+     List<ExpectedValues> expected = new ArrayList<ExpectedValues>();
+ 
+     // Chunk.OFF_HEAP_HEADER_SIZE + 4 ?
+     
+     putString(region, expected);
+     putCompressedString(compressedRegion, expected);
+     putDate(region, expected);
+     putByteArray(region, expected);
+     putCompressedByteArray(compressedRegion, expected);
+     putByteArrayArray(region, expected);
+     putShortArray(region, expected);
+     putStringArray(region, expected);
+     putObjectArray(region, expected);
+     putArrayList(region, expected);
+     putLinkedList(region, expected);
+     putHashSet(region, expected);
+     putLinkedHashSet(region, expected);
+     putHashMap(region, expected);
+     putIdentityHashMap(region, expected);
+     putHashtable(region, expected);
+     putProperties(region, expected);
+     putVector(region, expected);
+     putStack(region, expected);
+     putTreeMap(region, expected);
+     putTreeSet(region, expected);
+     putClass(region, expected);
+     putUUID(region, expected);
+     putTimestamp(region, expected);
+     putSerializableClass(region, expected);
+     
+     // TODO: USER_DATA_SERIALIZABLE
+     
+     // TODO: PDX
+     
+     // TODO: PDX_ENUM
+     
+     // TODO: GEMFIRE_ENUM
+     
+     // TODO: PDX_INLINE_ENUM
+     
+     // validate inspection
+     inspector.createSnapshot();
+     try {
+     MemoryBlock firstBlock = inspector.getFirstBlock();
+     assertEquals(MemoryBlock.State.UNUSED, firstBlock.getState());
+     
+     //System.out.println(((SimpleMemoryAllocatorImpl)inspector).getSnapshot());
+     
+     // sort the ExpectedValues into the same order as the MemberBlocks from inspector
+     Collections.sort(expected, 
+         new Comparator<ExpectedValues>() {
+           @Override
+           public int compare(ExpectedValues o1, ExpectedValues o2) {
+             return Long.valueOf(o1.memoryAddress).compareTo(o2.memoryAddress);
+           }
+     });
+     
+     int i = 0;
+     MemoryBlock block = firstBlock.getNextBlock();
+     while (block != null) {
+       ExpectedValues values = expected.get(i);
+       assertEquals(i + ":" + values.dataType, values.blockSize, block.getBlockSize());
+       assertEquals(i + ":" + values.dataType, values.dataType, block.getDataType());
+       assertEquals(i + ":" + values.dataType, values.freeListId, block.getFreeListId());
+       assertEquals(i + ":" + values.dataType, values.memoryAddress, block.getMemoryAddress());
+       assertEquals(i + ":" + values.dataType, values.refCount, block.getRefCount());
+       assertEquals(i + ":" + values.dataType, values.slabId, block.getSlabId());
+       assertEquals(i + ":" + values.dataType, values.isCompressed, block.isCompressed());
+       assertEquals(i + ":" + values.dataType, values.isSerialized, block.isSerialized());
+       // compare block.getDataValue() but only for String types
+       if (values.dataType.equals("java.lang.String")) {
+         Object obj = block.getDataValue();
+         assertNotNull(block.toString(), obj);
+         assertTrue(obj instanceof String);
+         assertEquals("this is a string", (String)obj);
+       }
+       if ((values.dataType.contains("byte [") && values.dataType.lastIndexOf('[') == values.dataType.indexOf('[')) || values.dataType.startsWith("compressed")) {
+         assertTrue("for dataType=" + values.dataType + " expected " + Arrays.toString((byte[])values.dataValue) + " but was " + Arrays.toString((byte[])block.getDataValue()),
+             Arrays.equals((byte[])values.dataValue, (byte[])block.getDataValue()));
+       } else if (values.dataType.contains("[")) {
+         // TODO: multiple dimension arrays or non-byte arrays
+       } else if (values.dataValue instanceof Collection) {
+         int diff = joint((Collection<?>)values.dataValue, (Collection<?>)block.getDataValue());
+         assertEquals(i + ":" + values.dataType, 0, diff);
+       } else if (values.dataValue instanceof IdentityHashMap) {
+         // TODO
+       } else if (values.dataValue instanceof Map) {
+         int diff = joint((Map<?,?>)values.dataValue, (Map<?,?>)block.getDataValue());
+         assertEquals(i + ":" + values.dataType, 0, diff);
+       } else {
+         assertEquals(i + ":" + values.dataType, values.dataValue, block.getDataValue());
+       }
+       block = block.getNextBlock();
+       i++;
+     }
+     assertEquals("All blocks: "+inspector.getAllBlocks(), expected.size(), i);
+     } finally {
+       inspector.clearSnapshot();
+     }
+     
+     // perform more ops
+ 
+     // validate more inspection
+     
+   }
+   
+   /**
+    * Returns -1 if c1 is missing an element in c2, 1 if c2 is missing an element
+    * in c1, or 0 is they contain the exact same elements.
+    * @throws NullPointerException if either c1 or c2 is null
+    */
+   private static int joint(Collection<?> c1, Collection<?> c2) {
+     if (c1.size() < c2.size()) {
+       return -1;
+     } else if (c2.size() < c1.size()) {
+       return 1;
+     }
+     Collection<Object> c3 = new ArrayList<Object>();
+     c3.addAll(c1);
+     c3.removeAll(c2);
+     if (c3.size() > 0) {
+       return -1;
+     }
+     c3.addAll(c2);
+     c3.removeAll(c1);
+     if (c3.size() > 0) {
+       return 1;
+     }
+     return 0;
+   }
+   
+   /**
+    * Returns -1 if m1 is missing a key in m2, 1 if m2 is missing a key
+    * in m1, or 0 is they contain the exact same keys.
+    * @throws NullPointerException if either c1 or c2 is null
+    */
+   private static int joint(Map<?, ?> m1, Map<?, ?> m2) {
+     if (m1.size() < m2.size()) {
+       return -1;
+     } else if (m2.size() < m1.size()) {
+       return 1;
+     }
+     Collection<Object> c3 = new ArrayList<Object>();
+     c3.addAll(m1.keySet());
+     c3.removeAll(m2.keySet());
+     if (c3.size() > 0) {
+       return -1;
+     }
+     c3.addAll(m2.keySet());
+     c3.removeAll(m1.keySet());
+     if (c3.size() > 0) {
+       return 1;
+     }
+     return 0;
+   }
+   
+   private long getMemoryAddress(Region region, String key) {
+     Object entry = ((LocalRegion) region).getRegionEntry(key)._getValue();
 -    assertTrue(entry instanceof Chunk);
 -    long memoryAddress = ((Chunk)entry).getMemoryAddress();
++    assertTrue(entry instanceof ObjectChunk);
++    long memoryAddress = ((ObjectChunk)entry).getMemoryAddress();
+     assertTrue(memoryAddress > 0);
+     return memoryAddress;
+   }
+   
+   private void putString(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyString";
+     String value = "this is a string";
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, value.length()*2, "java.lang.String", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+   
+   private void putCompressedString(Region<Object, Object> region, List<ExpectedValues> expected) throws IOException {
+     String key = "keyString";
+     String value = "this is a string";
+     region.put(key, value);
+     HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
+     DataSerializer.writeObject(value, hdos);
+     byte[] uncompressedBytes = hdos.toByteArray();
+     byte[] expectedValue = SnappyCompressor.getDefaultInstance().compress(uncompressedBytes);
+     expected.add(new ExpectedValues(expectedValue, 32, "compressed object of size " + expectedValue.length, -1, getMemoryAddress(region, key), 1, 0, true, true));
+   }
+ 
+   private void putDate(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyDate";
+     Date value = new Date();
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 24, "java.util.Date", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+   
+   private void putByteArray(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyByteArray";
+     byte[] value = new byte[10];
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 24, "byte[10]", -1, getMemoryAddress(region, key), 1, 0, false, false));
+   }
+   private void putCompressedByteArray(Region<Object, Object> region, List<ExpectedValues> expected) throws IOException {
+     String key = "keyByteArray";
+     byte[] value = new byte[10];
+     region.put(key, value);
+     byte[] expectedValue = SnappyCompressor.getDefaultInstance().compress(value);
+     expected.add(new ExpectedValues(expectedValue, 24, "compressed byte[" + expectedValue.length + "]", -1, getMemoryAddress(region, key), 1, 0, true, false));
+   }
+   
+   private void putByteArrayArray(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyByteArrayArray";
+     byte[][] value = new byte[10][10];
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 120, "byte[][]", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+   
+   private void putShortArray(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyShortArray(";
+     short[] value = new short[10];
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 32, "short[]", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+   
+   private void putStringArray(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyStringArray";
+     String[] value = new String[10];
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 24, "java.lang.String[]", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+   
+   private void putObjectArray(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyObjectArray";
+     Object[] value = new Object[10];
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 40, "java.lang.Object[]", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+   
+   private void putArrayList(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyArrayList";
+     ArrayList<Object> value = new ArrayList<Object>();
+     value.add("string 1");
+     value.add("string 2");
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 32, "java.util.ArrayList", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+   
+   private void putLinkedList(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyLinkedList";
+     LinkedList<Object> value = new LinkedList<Object>();
+     value.add("string 1");
+     value.add("string 2");
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 32, "java.util.LinkedList", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+   
+   private void putHashSet(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyHashSet";
+     HashSet<Object> value = new HashSet<Object>();
+     value.add("string 1");
+     value.add("string 2");
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 32, "java.util.HashSet", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+   
+   private void putLinkedHashSet(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyLinkedHashSet";
+     LinkedHashSet<Object> value = new LinkedHashSet<Object>();
+     value.add("string 1");
+     value.add("string 2");
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 32, "java.util.LinkedHashSet", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+   
+   private void putHashMap(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyHashMap";
+     HashMap<Object,Object> value = new HashMap<Object,Object>();
+     value.put("1", "string 1");
+     value.put("2", "string 2");
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 40, "java.util.HashMap", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+ 
+   private void putIdentityHashMap(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyIdentityHashMap";
+     IdentityHashMap<Object,Object> value = new IdentityHashMap<Object,Object>();
+     value.put("1", "string 1");
+     value.put("2", "string 2");
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 40, "java.util.IdentityHashMap", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+   
+   private void putHashtable(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyHashtable";
+     Hashtable<Object,Object> value = new Hashtable<Object,Object>();
+     value.put("1", "string 1");
+     value.put("2", "string 2");
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 40, "java.util.Hashtable", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+   
+   private void putProperties(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyProperties";
+     Properties value = new Properties();
+     value.put("1", "string 1");
+     value.put("2", "string 2");
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 40, "java.util.Properties", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+   
+   private void putVector(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyVector";
+     Vector<String> value = new Vector<String>();
+     value.add("string 1");
+     value.add("string 2");
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 32, "java.util.Vector", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+   
+   private void putStack(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyStack";
+     Stack<String> value = new Stack<String>();
+     value.add("string 1");
+     value.add("string 2");
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 32, "java.util.Stack", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+   
+   private void putTreeMap(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyTreeMap";
+     TreeMap<String, String> value = new TreeMap<String, String>();
+     value.put("1", "string 1");
+     value.put("2", "string 2");
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 48, "java.util.TreeMap", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+   
+   private void putTreeSet(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyTreeSet";
+     TreeSet<String> value = new TreeSet<String>();
+     value.add("string 1");
+     value.add("string 2");
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 40, "java.util.TreeSet", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+ 
+   private void putClass(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyClass";
+     Class<String> value = String.class;
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 32, "java.lang.Class", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+   
+   private void putUUID(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyUUID";
+     UUID value = UUID.randomUUID(); 
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 32, "java.util.UUID", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+   
+   private void putTimestamp(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keyTimestamp";
+     Timestamp value = new Timestamp(System.currentTimeMillis());
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 24, "java.sql.Timestamp", -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+ 
+   private void putSerializableClass(Region<Object, Object> region, List<ExpectedValues> expected) {
+     String key = "keySerializableClass";
+     SerializableClass value = new SerializableClass();
+     region.put(key, value);
+     expected.add(new ExpectedValues(value, 112, "java.io.Serializable:" + SerializableClass.class.getName(), -1, getMemoryAddress(region, key), 1, 0, false, true));
+   }
+   
+   static class ExpectedValues {
+     final Object dataValue;
+     final int blockSize;
+     final String dataType;
+     final int freeListId;
+     final long memoryAddress;
+     final int refCount;
+     final int slabId;
+     final boolean isCompressed;
+     final boolean isSerialized;
+     ExpectedValues(Object dataValue, int blockSize, String dataType, int freeListId, long memoryAddress, int refCount, int slabId, boolean isCompressed, boolean isSerialized) {
+       this.dataValue = dataValue;
+       this.blockSize = blockSize;
+       this.dataType = dataType;
+       this.freeListId = freeListId;
+       this.memoryAddress = memoryAddress;
+       this.refCount = refCount;
+       this.slabId = slabId;
+       this.isCompressed = isCompressed;
+       this.isSerialized = isSerialized;
+     }
+   }
+   
+   @SuppressWarnings("serial")
+   public static class SerializableClass implements Serializable {
+     public boolean equals(Object obj) {
+       return obj instanceof SerializableClass;
+     }
+     public int hashCode() {
+       return 42;
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapWriteObjectAsByteArrayJUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapWriteObjectAsByteArrayJUnitTest.java
index 0000000,daebefa..9c83f5b
mode 000000,100644..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapWriteObjectAsByteArrayJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapWriteObjectAsByteArrayJUnitTest.java
@@@ -1,0 -1,115 +1,115 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import static org.junit.Assert.*;
+ 
+ import java.io.ByteArrayInputStream;
+ import java.io.DataInput;
+ import java.io.DataInputStream;
+ import java.io.IOException;
+ 
+ import org.junit.After;
+ import org.junit.Before;
+ import org.junit.Test;
+ import org.junit.experimental.categories.Category;
+ 
+ import com.gemstone.gemfire.DataSerializer;
+ import com.gemstone.gemfire.internal.HeapDataOutputStream;
+ import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+ import com.gemstone.gemfire.internal.offheap.NullOffHeapMemoryStats;
+ import com.gemstone.gemfire.internal.offheap.NullOutOfOffHeapMemoryListener;
+ import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
+ import com.gemstone.gemfire.internal.offheap.StoredObject;
+ import com.gemstone.gemfire.internal.offheap.UnsafeMemoryChunk;
+ import com.gemstone.gemfire.test.junit.categories.UnitTest;
+ 
+ @Category(UnitTest.class)
+ public class OffHeapWriteObjectAsByteArrayJUnitTest {
+ 
+   @Before
+   public void setUp() throws Exception {
 -    SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
++    SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
+   }
+ 
+   @After
+   public void tearDown() throws Exception {
+     SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+   }
+   
+   private StoredObject createStoredObject(byte[] bytes, boolean isSerialized, boolean isCompressed) {
 -    return SimpleMemoryAllocatorImpl.getAllocator().allocateAndInitialize(bytes, isSerialized, isCompressed, null);
++    return SimpleMemoryAllocatorImpl.getAllocator().allocateAndInitialize(bytes, isSerialized, isCompressed);
+   }
+   
+   private DataInputStream createInput(HeapDataOutputStream hdos) {
+     ByteArrayInputStream bais = new ByteArrayInputStream(hdos.toByteArray());
+     return new DataInputStream(bais);
+   }
+   
+   @Test
+   public void testByteArrayChunk() throws IOException, ClassNotFoundException {
+     byte[] expected = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
+     StoredObject so = createStoredObject(expected, false, false);
 -    assertTrue(so instanceof Chunk);
++    assertTrue(so instanceof ObjectChunk);
+     HeapDataOutputStream hdos = new HeapDataOutputStream(new byte[1024]);
+     DataSerializer.writeObjectAsByteArray(so, hdos);
+     DataInputStream in = createInput(hdos);
+     byte[] actual = DataSerializer.readByteArray(in);
+     assertArrayEquals(expected, actual);
+   }
+   
+   @Test
+   public void testByteArrayDataAsAddress() throws IOException, ClassNotFoundException {
+     byte[] expected = new byte[] {1, 2, 3};
+     StoredObject so = createStoredObject(expected, false, false);
+     assertTrue(so instanceof DataAsAddress);
+     HeapDataOutputStream hdos = new HeapDataOutputStream(new byte[1024]);
+     DataSerializer.writeObjectAsByteArray(so, hdos);
+     DataInputStream in = createInput(hdos);
+     byte[] actual = DataSerializer.readByteArray(in);
+     assertArrayEquals(expected, actual);
+   }
+   
+   @Test
+   public void testStringChunk() throws IOException, ClassNotFoundException {
+     byte[] expected = EntryEventImpl.serialize("1234567890");
+     StoredObject so = createStoredObject(expected, true, false);
 -    assertTrue(so instanceof Chunk);
++    assertTrue(so instanceof ObjectChunk);
+     HeapDataOutputStream hdos = new HeapDataOutputStream(new byte[1024]);
+     DataSerializer.writeObjectAsByteArray(so, hdos);
+     DataInputStream in = createInput(hdos);
+     byte[] actual = DataSerializer.readByteArray(in);
+     assertArrayEquals(expected, actual);
+     assertNoMoreInput(in);
+   }
+   
+   @Test
+   public void testStringDataAsAddress() throws IOException, ClassNotFoundException {
+     byte[] expected = EntryEventImpl.serialize("1234");
+     StoredObject so = createStoredObject(expected, true, false);
+     assertTrue(so instanceof DataAsAddress);
+     HeapDataOutputStream hdos = new HeapDataOutputStream(new byte[1024]);
+     DataSerializer.writeObjectAsByteArray(so, hdos);
+     DataInputStream in = createInput(hdos);
+     byte[] actual = DataSerializer.readByteArray(in);
+     assertArrayEquals(expected, actual);
+   }
+   
+   private void assertNoMoreInput(DataInputStream in) throws IOException {
+     assertEquals(0, in.available());
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OldFreeListOffHeapRegionJUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OldFreeListOffHeapRegionJUnitTest.java
index 0000000,6e26b2f..d8c35b8
mode 000000,100755..100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OldFreeListOffHeapRegionJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/OldFreeListOffHeapRegionJUnitTest.java
@@@ -1,0 -1,47 +1,47 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import org.junit.experimental.categories.Category;
+ 
+ import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+ 
+ @Category(IntegrationTest.class)
+ public class OldFreeListOffHeapRegionJUnitTest extends OffHeapRegionBase {
+ 
+   @Override
+   protected String getOffHeapMemorySize() {
+     return "20m";
+   }
+   
+   @Override
+   public void configureOffHeapStorage() {
+     System.setProperty("gemfire.OFF_HEAP_SLAB_SIZE", "1m");
+   }
+ 
+   @Override
+   public void unconfigureOffHeapStorage() {
+     System.clearProperty("gemfire.OFF_HEAP_TOTAL_SIZE");
+     System.clearProperty("gemfire.OFF_HEAP_SLAB_SIZE");
+   }
+ 
+   @Override
+   public int perObjectOverhead() {
 -    return Chunk.OFF_HEAP_HEADER_SIZE;
++    return ObjectChunk.OFF_HEAP_HEADER_SIZE;
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorFillPatternIntegrationTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorFillPatternIntegrationTest.java
index 0000000,239cbc8..51f46a1
mode 000000,100644..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorFillPatternIntegrationTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorFillPatternIntegrationTest.java
@@@ -1,0 -1,246 +1,246 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import static org.junit.Assert.*;
+ 
+ import java.util.Collections;
+ import java.util.LinkedList;
+ import java.util.List;
+ import java.util.Random;
+ import java.util.concurrent.CountDownLatch;
+ import java.util.concurrent.ThreadLocalRandom;
+ import java.util.concurrent.TimeUnit;
+ 
+ import org.junit.After;
+ import org.junit.Before;
+ import org.junit.Test;
+ import org.junit.experimental.categories.Category;
+ 
+ import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+ 
+ /**
+  * Tests fill pattern validation for the {@link SimpleMemoryAllocatorImpl}.
+  */
+ @Category(IntegrationTest.class)
+ public class SimpleMemoryAllocatorFillPatternIntegrationTest {
+   private static Random random = ThreadLocalRandom.current();
+ 
+   /**
+    * Chunk operation types.
+    */
+   static enum Operation {
+     ALLOCATE,
+     FREE,
+     WRITE;
+     
+     // Holds all Operation values
+     private static Operation[] values = Operation.values(); 
+     
+     static Operation randomOperation() {
+       return values[random.nextInt(values.length)];
+     }
+   };
+   
+   /** Number of worker threads for advanced tests. */
+   private static final int WORKER_THREAD_COUNT = 5;
+   
+   /** Size of single test slab.*/
+   private static final int SLAB_SIZE = 1024 * 1024 * 50;
+   
+   /** Maximum number of bytes a worker thread can allocate during advanced tests.  */
+   private static final int MAX_WORKER_ALLOCATION_TOTAL_SIZE = SLAB_SIZE / WORKER_THREAD_COUNT / 2;
+   
+   /** Maximum allocation for a single Chunk.  */
+   private static final int MAX_WORKER_ALLOCATION_SIZE = 512;
+   
+   /** Canned data for write operations. */
+   private static final byte[] WRITE_BYTES = new String("Some string data.").getBytes();
+   
+   /** Minimum size for write operations. */
+   private static final int MIN_WORKER_ALLOCATION_SIZE = WRITE_BYTES.length;
+ 
+   /** Runtime for worker threads. */
+   private static final long RUN_TIME_IN_MILLIS = 1 * 1000 * 5;
+   
+   /** Chunk size for basic huge allocation test. */
+   private static final int HUGE_CHUNK_SIZE = 1024 * 200;
+   
+   /** Our test victim. */
+   private SimpleMemoryAllocatorImpl allocator = null;
+   
+   /** Our test victim's memory slab. */
+   private UnsafeMemoryChunk slab = null;
+ 
+   /**
+    * Enables fill validation and creates the test victim.
+    */
+   @Before
+   public void setUp() throws Exception {
+     System.setProperty("gemfire.validateOffHeapWithFill", "true");
+     this.slab = new UnsafeMemoryChunk(SLAB_SIZE);
 -    this.allocator = SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{this.slab});
++    this.allocator = SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{this.slab});
+   }
+ 
+   /**
+    * Frees off heap memory.
+    */
+   @After
+   public void tearDown() throws Exception {
+     SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+     System.clearProperty("gemfire.validateOffHeapWithFill");
+   }
+   
+   /**
+    * This test hammers a SimpleMemoryAllocatorImpl with multiple threads exercising
+    * the fill validation of tiny Chunks for one minute.  This, of course, exercises many aspects of
+    * the SimpleMemoryAllocatorImpl and its helper classes.
+    * @throws Exception
+    */
+   @Test
+   public void testFillPatternAdvancedForTinyAllocations() throws Exception { 
+     doFillPatternAdvancedTest(new ChunkSizer() {
+       @Override
+       public int allocationSize() {
+         int allocation = random.nextInt(MAX_WORKER_ALLOCATION_SIZE+1);
+         
+         while(allocation < MIN_WORKER_ALLOCATION_SIZE) {
+           allocation = random.nextInt(MAX_WORKER_ALLOCATION_SIZE+1);
+         }
+         return allocation;
+       }
+     });
+   }
+ 
+   /**
+    * This test hammers a SimpleMemoryAllocatorImpl with multiple threads exercising
+    * the fill validation of huge Chunks for one minute.  This, of course, exercises many aspects of
+    * the SimpleMemoryAllocatorImpl and its helper classes.
+    * @throws Exception
+    */
+   @Test
+   public void testFillPatternAdvancedForHugeAllocations() throws Exception {
+     doFillPatternAdvancedTest(new ChunkSizer() {
+       @Override
+       public int allocationSize() {
+         return HUGE_CHUNK_SIZE;
+       }
+     });
+   }
+   
+   private interface ChunkSizer {
+     int allocationSize();
+   }
+   
+   private void doFillPatternAdvancedTest(final ChunkSizer chunkSizer) throws InterruptedException {
+     // Used to manage worker thread completion
+     final CountDownLatch latch = new CountDownLatch(WORKER_THREAD_COUNT);
+     
+     // Use to track any errors the worker threads will encounter
+     final List<Throwable> threadErrorList = Collections.synchronizedList(new LinkedList<Throwable>());
+     
+     /*
+      * Start up a number of worker threads.  These threads will randomly allocate, free,
+      * and write to Chunks.
+      */
+     for(int i = 0;i < WORKER_THREAD_COUNT;++i) {
+       new Thread(new Runnable() {
+         // Total allocation in bytes for this thread
+         private int totalAllocation = 0;
+         
+         // List of Chunks allocated by this thread
 -        private List<Chunk> chunks = new LinkedList<Chunk>();
++        private List<ObjectChunk> chunks = new LinkedList<ObjectChunk>();
+         
+         // Time to end thread execution
+         private long endTime = System.currentTimeMillis() + RUN_TIME_IN_MILLIS;
+         
+         /**
+          * Allocates a chunk and adds it to the thread's Chunk list.
+          */
+         private void allocate() {          
+           int allocation = chunkSizer.allocationSize();
 -          Chunk chunk = (Chunk) allocator.allocate(allocation, null);
++          ObjectChunk chunk = (ObjectChunk) allocator.allocate(allocation);
+           
+           // This should always work just after allocation
+           chunk.validateFill();
+           
+           chunks.add(chunk);
+           totalAllocation += chunk.getSize();
+         }
+         
+         /**
+          * Frees a random chunk from the Chunk list.
+          */
+         private void free() {
 -          Chunk chunk = chunks.remove(random.nextInt(chunks.size()));
++          ObjectChunk chunk = chunks.remove(random.nextInt(chunks.size()));
+           totalAllocation -= chunk.getSize();
+           
+           /*
+            * Chunk is filled here but another thread may have already grabbed it so we
+            * cannot validate the fill.
+            */
+           chunk.release(); 
+         }
+         
+         /**
+          * Writes canned data to a random Chunk from the Chunk list.
+          */
+         private void write() {
 -          Chunk chunk = chunks.get(random.nextInt(chunks.size()));
++          ObjectChunk chunk = chunks.get(random.nextInt(chunks.size()));
+           chunk.writeBytes(0, WRITE_BYTES);
+         }
+         
+         /**
+          * Randomly selects Chunk operations and executes them
+          * for a period of time.  Collects any error thrown during execution.
+          */
+         @Override
+         public void run() {
+           try {
+             for(long currentTime = System.currentTimeMillis();currentTime < endTime;currentTime = System.currentTimeMillis()) {
+               Operation op = (totalAllocation == 0 ? Operation.ALLOCATE : (totalAllocation >= MAX_WORKER_ALLOCATION_TOTAL_SIZE ? Operation.FREE : Operation.randomOperation()));
+               switch(op) {
+               case ALLOCATE:
+                 allocate();
+                 break;
+               case FREE:
+                 free();
+                 break;
+               case WRITE:
+                 write();
+                 break;
+               }
+             }
+           } catch (Throwable t) {
+             threadErrorList.add(t);
+           } finally {
+             latch.countDown();
+           }
+        }        
+       }).start();
+     }
+     
+     // Make sure each thread ended cleanly
+     assertTrue(latch.await(2, TimeUnit.MINUTES));
+     
+     // Fail on the first error we find
+     if(!threadErrorList.isEmpty()) {
+       fail(threadErrorList.get(0).getMessage());
+     }
+   }
+   
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorFillPatternJUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorFillPatternJUnitTest.java
index 0000000,21c9835..7c26f86
mode 000000,100644..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorFillPatternJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorFillPatternJUnitTest.java
@@@ -1,0 -1,183 +1,183 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import static org.junit.Assert.*;
+ import static com.googlecode.catchexception.CatchException.*;
+ 
+ import org.junit.After;
+ import org.junit.Before;
+ import org.junit.Test;
+ import org.junit.experimental.categories.Category;
+ 
+ import com.gemstone.gemfire.test.junit.categories.UnitTest;
+ 
+ import junit.framework.TestCase;
+ 
+ /**
+  * Tests fill pattern validation for the {@link SimpleMemoryAllocatorImpl}.
+  * @author rholmes
+  */
+ @Category(UnitTest.class)
+ public class SimpleMemoryAllocatorFillPatternJUnitTest {
+   
+   /** Size of single test slab.*/
+   private static final int SLAB_SIZE = 1024 * 1024 * 50;
+   
+   /** Canned data for write operations. */
+   private static final byte[] WRITE_BYTES = new String("Some string data.").getBytes();
+   
+   /** Chunk size for basic huge allocation test. */
+   private static final int HUGE_CHUNK_SIZE = 1024 * 200;
+   
+   /** The number of chunks to allocate in order to force compaction. */
+   private static final int COMPACTION_CHUNKS = 3;
+   
+   /** Our slab size divided in three (with some padding for safety). */
+   private static final int COMPACTION_CHUNK_SIZE = (SLAB_SIZE / COMPACTION_CHUNKS) - 1024;
+   
+   /** This should force compaction when allocated. */
+   private static final int FORCE_COMPACTION_CHUNK_SIZE = COMPACTION_CHUNK_SIZE * 2;
+ 
+   /** Our test victim. */
+   private SimpleMemoryAllocatorImpl allocator = null;
+   
+   /** Our test victim's memory slab. */
+   private UnsafeMemoryChunk slab = null;
+ 
+   /**
+    * Enables fill validation and creates the test victim.
+    */
+   @Before
+   public void setUp() throws Exception {
+     System.setProperty("gemfire.validateOffHeapWithFill", "true");
+     this.slab = new UnsafeMemoryChunk(SLAB_SIZE);
 -    this.allocator = SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{this.slab});
++    this.allocator = SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{this.slab});
+   }
+ 
+   /**
+    * Frees off heap memory.
+    */
+   @After
+   public void tearDown() throws Exception {
+     SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+     System.clearProperty("gemfire.validateOffHeapWithFill");
+   }
+ 
+   /**
+    * This tests the fill pattern for a single tiny Chunk allocation.
+    * @throws Exception
+    */
+   @Test
+   public void testFillPatternBasicForTinyAllocations() throws Exception {
+     doFillPatternBasic(1024);
+   }
+   
+   /**
+    * This tests the fill pattern for a single huge Chunk allocation.
+    * @throws Exception
+    */
+   @Test
+   public void testFillPatternBasicForHugeAllocations() throws Exception {
+     doFillPatternBasic(HUGE_CHUNK_SIZE);
+   }
+   
+   private void doFillPatternBasic(final int chunkSize) {
+     /*
+      * Pull a chunk off the fragment.  This will have no fill because
+      * it is a "fresh" chunk.
+      */
 -    Chunk chunk = (Chunk) this.allocator.allocate(chunkSize, null);
++    ObjectChunk chunk = (ObjectChunk) this.allocator.allocate(chunkSize);
+ 
+     /*
+      * Chunk should have valid fill from initial fragment allocation.
+      */
+     chunk.validateFill();
+          
+     // "Dirty" the chunk so the release has something to fill over
 -    chunk.writeBytes(Chunk.MIN_CHUNK_SIZE + 1, WRITE_BYTES);
++    chunk.writeBytes(ObjectChunk.MIN_CHUNK_SIZE + 1, WRITE_BYTES);
+ 
+     // This should free the Chunk (ref count == 1)
+     chunk.release();
+ 
+     /*
+      * This chunk should have a fill because it was reused from the
+      * free list (assuming no fragmentation at this point...)
+      */
 -    chunk = (Chunk) this.allocator.allocate(chunkSize, null);
++    chunk = (ObjectChunk) this.allocator.allocate(chunkSize);
+     
+     // Make sure we have a fill this time
+     chunk.validateFill();
+     
+     // Give the fill code something to write over during the release
 -    chunk.writeBytes(Chunk.MIN_CHUNK_SIZE + 1, WRITE_BYTES);
++    chunk.writeBytes(ObjectChunk.MIN_CHUNK_SIZE + 1, WRITE_BYTES);
+     chunk.release();
+ 
+     // Again, make sure the release implemented the fill
+     chunk.validateFill();
+ 
+     // "Dirty up" the free chunk
 -    chunk.writeBytes(Chunk.MIN_CHUNK_SIZE + 1, WRITE_BYTES);
++    chunk.writeBytes(ObjectChunk.MIN_CHUNK_SIZE + 1, WRITE_BYTES);
+     
+     catchException(chunk).validateFill();
+     assertTrue(caughtException() instanceof IllegalStateException);
+     assertEquals("Fill pattern violated for chunk " + chunk.getMemoryAddress() + " with size " + chunk.getSize(), caughtException().getMessage());
+     
+   }
+ 
+   /**
+    * This tests that fill validation is working properly on newly created fragments after
+    * a compaction.
+    * @throws Exception
+    */
+   @Test
+   public void testFillPatternAfterCompaction() throws Exception {
+     /*
+      * Stores our allocated memory.
+      */
 -    Chunk[] allocatedChunks = new Chunk[COMPACTION_CHUNKS];
++    ObjectChunk[] allocatedChunks = new ObjectChunk[COMPACTION_CHUNKS];
+     
+     /*
+      * Use up most of our memory
+      * Our memory looks like [      ][      ][      ]
+      */
+     for(int i =0;i < allocatedChunks.length;++i) {
 -      allocatedChunks[i] = (Chunk) this.allocator.allocate(COMPACTION_CHUNK_SIZE, null);
++      allocatedChunks[i] = (ObjectChunk) this.allocator.allocate(COMPACTION_CHUNK_SIZE);
+       allocatedChunks[i].validateFill();
+     }
+ 
+     /*
+      * Release some of our allocated chunks.
+      */
+     for(int i=0;i < 2;++i) {
+       allocatedChunks[i].release();
+       allocatedChunks[i].validateFill();      
+     }
+     
+     /*
+      * Now, allocate another chunk that is slightly larger than one of
+      * our initial chunks.  This should force a compaction causing our
+      * memory to look like [            ][      ].
+      */
 -    Chunk slightlyLargerChunk = (Chunk) this.allocator.allocate(FORCE_COMPACTION_CHUNK_SIZE, null);
++    ObjectChunk slightlyLargerChunk = (ObjectChunk) this.allocator.allocate(FORCE_COMPACTION_CHUNK_SIZE);
+     
+     /*
+      * Make sure the compacted memory has the fill validation.
+      */
+     slightlyLargerChunk.validateFill();
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorJUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorJUnitTest.java
index 0000000,d9979cc..1f17f9b
mode 000000,100644..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorJUnitTest.java
@@@ -1,0 -1,675 +1,631 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import static org.junit.Assert.*;
+ 
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collections;
+ import java.util.concurrent.atomic.AtomicReference;
+ 
+ import org.junit.Rule;
+ import org.junit.Test;
+ import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+ import org.junit.experimental.categories.Category;
+ 
+ import com.gemstone.gemfire.OutOfOffHeapMemoryException;
+ import com.gemstone.gemfire.cache.CacheClosedException;
+ import com.gemstone.gemfire.internal.logging.NullLogWriter;
 -import com.gemstone.gemfire.internal.offheap.UnsafeMemoryChunk.Factory;
+ import com.gemstone.gemfire.test.junit.categories.UnitTest;
+ 
+ @Category(UnitTest.class)
+ public class SimpleMemoryAllocatorJUnitTest {
+   @Rule
+   public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+ 
+   private static int round(int multiple, int v) {
+     return ((v+multiple-1)/multiple)*multiple;
+   }
+   @Test
+   public void testNullGetAllocator() {
+     try {
+       SimpleMemoryAllocatorImpl.getAllocator();
+       fail("expected CacheClosedException");
+     } catch (CacheClosedException expected) {
+     }
+   }
+   @Test
+   public void testConstructor() {
+     try {
 -      SimpleMemoryAllocatorImpl.create(null, null, null);
++      SimpleMemoryAllocatorImpl.createForUnitTest(null, null, null);
+       fail("expected IllegalArgumentException");
+     } catch (IllegalArgumentException expected) {
+     }
 -    try {
 -      SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), null, null, -1, 0, 0, 0);
 -      fail("expected IllegalStateException");
 -    } catch (IllegalStateException expected) {
 -      assertEquals(true, expected.getMessage().contains("gemfire.OFF_HEAP_ALIGNMENT must be a multiple of 8"));
 -    }
 -    try {
 -      SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), null, null, 9, 0, 0, 0);
 -      fail("expected IllegalStateException");
 -    } catch (IllegalStateException expected) {
 -      assertEquals(true, expected.getMessage().contains("gemfire.OFF_HEAP_ALIGNMENT must be a multiple of 8"));
 -    }
 -    try {
 -      SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), null, null, 256+8, 0, 0, 0);
 -      fail("expected IllegalStateException");
 -    } catch (IllegalStateException expected) {
 -      assertEquals(true, expected.getMessage().contains("gemfire.OFF_HEAP_ALIGNMENT must be <= 256"));
 -    }
 -    try {
 -      SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), null, null, 8, 0, 0, 0);
 -      fail("expected IllegalStateException");
 -    } catch (IllegalStateException expected) {
 -      assertEquals(true, expected.getMessage().contains("gemfire.OFF_HEAP_BATCH_ALLOCATION_SIZE must be >= 1."));
 -    }
 -    try {
 -      SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), null, null, 8, 1, 0, 0);
 -      fail("expected IllegalStateException");
 -    } catch (IllegalStateException expected) {
 -      assertEquals(true, expected.getMessage().contains("gemfire.OFF_HEAP_FREE_LIST_COUNT must be >= 1."));
 -    }
 -    try {
 -      SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), null, null, 8, 1, 1, -1);
 -      fail("expected IllegalStateException");
 -    } catch (IllegalStateException expected) {
 -      assertEquals(true, expected.getMessage().contains("HUGE_MULTIPLE must be >= 0 and <= 256 but it was -1"));
 -    }
 -    try {
 -      SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), null, null, 8, 1, 1, 257);
 -      fail("expected IllegalStateException");
 -    } catch (IllegalStateException expected) {
 -      assertEquals(true, expected.getMessage().contains("HUGE_MULTIPLE must be >= 0 and <= 256 but it was 257"));
 -    }
 -     
+   }
+   /**
+    * Logger that remembers the last severe message
+    */
+   private static class LastSevereLogger extends NullLogWriter {
+     private String lastSevereMessage;
+     private Throwable lastSevereThrowable;
+     
+     private void setLastSevere(String msg, Throwable ex) {
+       this.lastSevereMessage = msg;
+       this.lastSevereThrowable = ex;
+     }
+     public String getLastSevereMessage() {
+       return this.lastSevereMessage;
+     }
+     public Throwable getLastSevereThrowable() {
+       return this.lastSevereThrowable;
+     }
+     @Override
+     public void severe(String msg, Throwable ex) {
+       setLastSevere(msg, ex);
+     }
+     @Override
+     public void severe(String msg) {
+       setLastSevere(msg, null);
+     }
+     @Override
+     public void severe(Throwable ex) {
+       setLastSevere(null, ex);
+     }
+   }
+   @Test
+   public void testCreate() {
+     System.setProperty(SimpleMemoryAllocatorImpl.FREE_OFF_HEAP_MEMORY_PROPERTY, "false");
+     {
+       NullOutOfOffHeapMemoryListener listener = new NullOutOfOffHeapMemoryListener();
+       NullOffHeapMemoryStats stats = new NullOffHeapMemoryStats();
+       LastSevereLogger logger = new LastSevereLogger();
+       try {
 -        SimpleMemoryAllocatorImpl.create(listener, stats, logger, 10, 950, 100,
 -            new UnsafeMemoryChunk.Factory() {
++        SimpleMemoryAllocatorImpl.createForUnitTest(listener, stats, logger, 10, 950, 100,
++            new AddressableMemoryChunkFactory() {
+           @Override
 -          public UnsafeMemoryChunk create(int size) {
++          public AddressableMemoryChunk create(int size) {
+             throw new OutOfMemoryError("expected");
+           }
+         });
+       } catch (OutOfMemoryError expected) {
+       }
+       assertTrue(listener.isClosed());
+       assertTrue(stats.isClosed());
+       assertEquals(null, logger.getLastSevereThrowable());
+       assertEquals(null, logger.getLastSevereMessage());
+      }
+     {
+       NullOutOfOffHeapMemoryListener listener = new NullOutOfOffHeapMemoryListener();
+       NullOffHeapMemoryStats stats = new NullOffHeapMemoryStats();
+       LastSevereLogger logger = new LastSevereLogger();
+       int MAX_SLAB_SIZE = 100;
+       try {
 -        Factory factory = new UnsafeMemoryChunk.Factory() {
++        AddressableMemoryChunkFactory factory = new AddressableMemoryChunkFactory() {
+           private int createCount = 0;
+           @Override
 -          public UnsafeMemoryChunk create(int size) {
++          public AddressableMemoryChunk create(int size) {
+             createCount++;
+             if (createCount == 1) {
+               return new UnsafeMemoryChunk(size);
+             } else {
+               throw new OutOfMemoryError("expected");
+             }
+           }
+         };
 -        SimpleMemoryAllocatorImpl.create(listener, stats, logger, 10, 950, MAX_SLAB_SIZE, factory);
++        SimpleMemoryAllocatorImpl.createForUnitTest(listener, stats, logger, 10, 950, MAX_SLAB_SIZE, factory);
+       } catch (OutOfMemoryError expected) {
+       }
+       assertTrue(listener.isClosed());
+       assertTrue(stats.isClosed());
+       assertEquals(null, logger.getLastSevereThrowable());
+       assertEquals("Off-heap memory creation failed after successfully allocating " + MAX_SLAB_SIZE + " bytes of off-heap memory.", logger.getLastSevereMessage());
+     }
+     {
+       NullOutOfOffHeapMemoryListener listener = new NullOutOfOffHeapMemoryListener();
+       NullOffHeapMemoryStats stats = new NullOffHeapMemoryStats();
 -      Factory factory = new UnsafeMemoryChunk.Factory() {
++      AddressableMemoryChunkFactory factory = new AddressableMemoryChunkFactory() {
+         @Override
 -        public UnsafeMemoryChunk create(int size) {
++        public AddressableMemoryChunk create(int size) {
+           return new UnsafeMemoryChunk(size);
+         }
+       };
+       MemoryAllocator ma = 
 -        SimpleMemoryAllocatorImpl.create(listener, stats, new NullLogWriter(), 10, 950, 100, factory);
++        SimpleMemoryAllocatorImpl.createForUnitTest(listener, stats, new NullLogWriter(), 10, 950, 100, factory);
+       try {
+         assertFalse(listener.isClosed());
+         assertFalse(stats.isClosed());
+         ma.close();
+         assertTrue(listener.isClosed());
+         assertFalse(stats.isClosed());
+         listener = new NullOutOfOffHeapMemoryListener();
+         NullOffHeapMemoryStats stats2 = new NullOffHeapMemoryStats();
+         {
+           UnsafeMemoryChunk slab = new UnsafeMemoryChunk(1024);
+           try {
 -            SimpleMemoryAllocatorImpl.create(listener, stats2, new UnsafeMemoryChunk[]{slab});
++            SimpleMemoryAllocatorImpl.createForUnitTest(listener, stats2, new UnsafeMemoryChunk[]{slab});
+           } catch (IllegalStateException expected) {
+             assertTrue("unexpected message: " + expected.getMessage(), 
+                 expected.getMessage().equals("attempted to reuse existing off-heap memory even though new off-heap memory was allocated"));
+           } finally {
+             slab.release();
+           }
+           assertFalse(stats.isClosed());
+           assertTrue(listener.isClosed());
+           assertTrue(stats2.isClosed());
+         }
+         listener = new NullOutOfOffHeapMemoryListener();
+         stats2 = new NullOffHeapMemoryStats();
 -        MemoryAllocator ma2 = SimpleMemoryAllocatorImpl.create(listener, stats2, new NullLogWriter(), 10, 950, 100, factory);
++        MemoryAllocator ma2 = SimpleMemoryAllocatorImpl.createForUnitTest(listener, stats2, new NullLogWriter(), 10, 950, 100, factory);
+         assertSame(ma, ma2);
+         assertTrue(stats.isClosed());
+         assertFalse(listener.isClosed());
+         assertFalse(stats2.isClosed());
+         stats = stats2;
+         ma.close();
+         assertTrue(listener.isClosed());
+         assertFalse(stats.isClosed());
+       } finally {
+         SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+       }
+       assertTrue(stats.isClosed());
+     }
+   }
+   @Test
+   public void testBasics() {
 -    int BATCH_SIZE = com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.BATCH_SIZE;
 -    int TINY_MULTIPLE = com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.TINY_MULTIPLE;
 -    int HUGE_MULTIPLE = com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.HUGE_MULTIPLE;
 -    int perObjectOverhead = com.gemstone.gemfire.internal.offheap.Chunk.OFF_HEAP_HEADER_SIZE;
 -    int maxTiny = com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.MAX_TINY-perObjectOverhead;
++    int BATCH_SIZE = 1;
++    int TINY_MULTIPLE = com.gemstone.gemfire.internal.offheap.FreeListManager.TINY_MULTIPLE;
++    int HUGE_MULTIPLE = com.gemstone.gemfire.internal.offheap.FreeListManager.HUGE_MULTIPLE;
++    int perObjectOverhead = com.gemstone.gemfire.internal.offheap.ObjectChunk.OFF_HEAP_HEADER_SIZE;
++    int maxTiny = com.gemstone.gemfire.internal.offheap.FreeListManager.MAX_TINY-perObjectOverhead;
+     int minHuge = maxTiny+1;
+     int TOTAL_MEM = (maxTiny+perObjectOverhead)*BATCH_SIZE /*+ (maxBig+perObjectOverhead)*BATCH_SIZE*/ + round(TINY_MULTIPLE, minHuge+1+perObjectOverhead)*BATCH_SIZE + (TINY_MULTIPLE+perObjectOverhead)*BATCH_SIZE /*+ (MIN_BIG_SIZE+perObjectOverhead)*BATCH_SIZE*/ + round(TINY_MULTIPLE, minHuge+perObjectOverhead+1);
+     UnsafeMemoryChunk slab = new UnsafeMemoryChunk(TOTAL_MEM);
+     try {
 -      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{slab});
++      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{slab});
+       assertEquals(TOTAL_MEM, ma.getFreeMemory());
+       assertEquals(TOTAL_MEM, ma.freeList.getFreeFragmentMemory());
+       assertEquals(0, ma.freeList.getFreeTinyMemory());
+       assertEquals(0, ma.freeList.getFreeHugeMemory());
 -      MemoryChunk tinymc = ma.allocate(maxTiny, null);
++      MemoryChunk tinymc = ma.allocate(maxTiny);
+       assertEquals(TOTAL_MEM-round(TINY_MULTIPLE, maxTiny+perObjectOverhead), ma.getFreeMemory());
+       assertEquals(round(TINY_MULTIPLE, maxTiny+perObjectOverhead)*(BATCH_SIZE-1), ma.freeList.getFreeTinyMemory());
 -      MemoryChunk hugemc = ma.allocate(minHuge, null);
++      MemoryChunk hugemc = ma.allocate(minHuge);
+       assertEquals(TOTAL_MEM-round(TINY_MULTIPLE, minHuge+perObjectOverhead)/*-round(BIG_MULTIPLE, maxBig+perObjectOverhead)*/-round(TINY_MULTIPLE, maxTiny+perObjectOverhead), ma.getFreeMemory());
+       long freeSlab = ma.freeList.getFreeFragmentMemory();
+       long oldFreeHugeMemory = ma.freeList.getFreeHugeMemory();
+       assertEquals(round(TINY_MULTIPLE, minHuge+perObjectOverhead)*(BATCH_SIZE-1), oldFreeHugeMemory);
+       hugemc.release();
+       assertEquals(round(TINY_MULTIPLE, minHuge+perObjectOverhead), ma.freeList.getFreeHugeMemory()-oldFreeHugeMemory);
+       assertEquals(TOTAL_MEM/*-round(BIG_MULTIPLE, maxBig+perObjectOverhead)*/-round(TINY_MULTIPLE, maxTiny+perObjectOverhead), ma.getFreeMemory());
+       assertEquals(TOTAL_MEM-round(TINY_MULTIPLE, maxTiny+perObjectOverhead), ma.getFreeMemory());
+       long oldFreeTinyMemory = ma.freeList.getFreeTinyMemory();
+       tinymc.release();
+       assertEquals(round(TINY_MULTIPLE, maxTiny+perObjectOverhead), ma.freeList.getFreeTinyMemory()-oldFreeTinyMemory);
+       assertEquals(TOTAL_MEM, ma.getFreeMemory());
+       // now lets reallocate from the free lists
 -      tinymc = ma.allocate(maxTiny, null);
++      tinymc = ma.allocate(maxTiny);
+       assertEquals(oldFreeTinyMemory, ma.freeList.getFreeTinyMemory());
+       assertEquals(TOTAL_MEM-round(TINY_MULTIPLE, maxTiny+perObjectOverhead), ma.getFreeMemory());
 -      hugemc = ma.allocate(minHuge, null);
++      hugemc = ma.allocate(minHuge);
+       assertEquals(oldFreeHugeMemory, ma.freeList.getFreeHugeMemory());
+       assertEquals(TOTAL_MEM-round(TINY_MULTIPLE, minHuge+perObjectOverhead)/*-round(BIG_MULTIPLE, maxBig+perObjectOverhead)*/-round(TINY_MULTIPLE, maxTiny+perObjectOverhead), ma.getFreeMemory());
+       hugemc.release();
+       assertEquals(round(TINY_MULTIPLE, minHuge+perObjectOverhead), ma.freeList.getFreeHugeMemory()-oldFreeHugeMemory);
+       assertEquals(TOTAL_MEM/*-round(BIG_MULTIPLE, maxBig+perObjectOverhead)*/-round(TINY_MULTIPLE, maxTiny+perObjectOverhead), ma.getFreeMemory());
+       assertEquals(TOTAL_MEM-round(TINY_MULTIPLE, maxTiny+perObjectOverhead), ma.getFreeMemory());
+       tinymc.release();
+       assertEquals(round(TINY_MULTIPLE, maxTiny+perObjectOverhead), ma.freeList.getFreeTinyMemory()-oldFreeTinyMemory);
+       assertEquals(TOTAL_MEM, ma.getFreeMemory());
+       // None of the reallocates should have come from the slab.
+       assertEquals(freeSlab, ma.freeList.getFreeFragmentMemory());
 -      tinymc = ma.allocate(1, null);
++      tinymc = ma.allocate(1);
+       assertEquals(round(TINY_MULTIPLE, 1+perObjectOverhead), tinymc.getSize());
+       assertEquals(freeSlab-(round(TINY_MULTIPLE, 1+perObjectOverhead)*BATCH_SIZE), ma.freeList.getFreeFragmentMemory());
+       freeSlab = ma.freeList.getFreeFragmentMemory();
+       tinymc.release();
+       assertEquals(round(TINY_MULTIPLE, maxTiny+perObjectOverhead)+(round(TINY_MULTIPLE, 1+perObjectOverhead)*BATCH_SIZE), ma.freeList.getFreeTinyMemory()-oldFreeTinyMemory);
+       
 -      hugemc = ma.allocate(minHuge+1, null);
++      hugemc = ma.allocate(minHuge+1);
+       assertEquals(round(TINY_MULTIPLE, minHuge+1+perObjectOverhead), hugemc.getSize());
+       assertEquals(round(TINY_MULTIPLE, minHuge+perObjectOverhead)*(BATCH_SIZE-1), ma.freeList.getFreeHugeMemory());
+       hugemc.release();
+       assertEquals(round(TINY_MULTIPLE, minHuge+perObjectOverhead)*BATCH_SIZE, ma.freeList.getFreeHugeMemory());
 -      hugemc = ma.allocate(minHuge, null);
++      hugemc = ma.allocate(minHuge);
+       assertEquals(round(TINY_MULTIPLE, minHuge+perObjectOverhead)*(BATCH_SIZE-1), ma.freeList.getFreeHugeMemory());
+       if (BATCH_SIZE > 1) {
 -        MemoryChunk hugemc2 = ma.allocate(minHuge, null);
++        MemoryChunk hugemc2 = ma.allocate(minHuge);
+         assertEquals(round(TINY_MULTIPLE, minHuge+perObjectOverhead)*(BATCH_SIZE-2), ma.freeList.getFreeHugeMemory());
+         hugemc2.release();
+         assertEquals(round(TINY_MULTIPLE, minHuge+perObjectOverhead)*(BATCH_SIZE-1), ma.freeList.getFreeHugeMemory());
+       }
+       hugemc.release();
+       assertEquals(round(TINY_MULTIPLE, minHuge+perObjectOverhead)*BATCH_SIZE, ma.freeList.getFreeHugeMemory());
+       // now that we do compaction the following allocate works.
 -      hugemc = ma.allocate(minHuge + HUGE_MULTIPLE + HUGE_MULTIPLE-1, null);
++      hugemc = ma.allocate(minHuge + HUGE_MULTIPLE + HUGE_MULTIPLE-1);
+     } finally {
+       SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+     }
+   }
+   
+   @Test
+   public void testChunkCreateDirectByteBuffer() {
+     UnsafeMemoryChunk slab = new UnsafeMemoryChunk(1024*1024);
+     try {
 -      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{slab});
++      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{slab});
+       ByteBuffer bb = ByteBuffer.allocate(1024);
+       for (int i=0; i < 1024; i++) {
+         bb.put((byte) i);
+       }
+       bb.position(0);
 -      Chunk c = (Chunk) ma.allocateAndInitialize(bb.array(), false, false, null);
++      ObjectChunk c = (ObjectChunk) ma.allocateAndInitialize(bb.array(), false, false);
+       assertEquals(1024, c.getDataSize());
+       if (!Arrays.equals(bb.array(), c.getRawBytes())) {
+         fail("arrays are not equal. Expected " + Arrays.toString(bb.array()) + " but found: " + Arrays.toString(c.getRawBytes()));
+       }
+       ByteBuffer dbb = c.createDirectByteBuffer();
+       assertEquals(true, dbb.isDirect());
+       assertEquals(bb, dbb);
+     } finally {
+       SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+     }
+   }
+   
+   @Test
+   public void testDebugLog() {
+     SimpleMemoryAllocatorImpl.debugLog("test debug log", false);
+     SimpleMemoryAllocatorImpl.debugLog("test debug log", true);
+   }
+   @Test
+   public void testGetLostChunks() {
+     UnsafeMemoryChunk slab = new UnsafeMemoryChunk(1024*1024);
+     try {
 -      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{slab});
++      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{slab});
+       assertEquals(Collections.emptyList(), ma.getLostChunks());
+     } finally {
+       SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+     }
+   }
+   @Test
+   public void testFindSlab() {
+     final int SLAB_SIZE = 1024*1024;
+     UnsafeMemoryChunk slab = new UnsafeMemoryChunk(SLAB_SIZE);
+     try {
 -      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{slab});
++      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{slab});
+       assertEquals(0, ma.findSlab(slab.getMemoryAddress()));
+       assertEquals(0, ma.findSlab(slab.getMemoryAddress()+SLAB_SIZE-1));
+       try {
+         ma.findSlab(slab.getMemoryAddress()-1);
+         fail("expected IllegalStateException");
+       } catch (IllegalStateException expected) {
+       }
+       try {
+         ma.findSlab(slab.getMemoryAddress()+SLAB_SIZE);
+         fail("expected IllegalStateException");
+       } catch (IllegalStateException expected) {
+       }
+     } finally {
+       SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+     }
+   }
+   @Test
+   public void testValidateAddressAndSize() {
+     final int SLAB_SIZE = 1024*1024;
+     UnsafeMemoryChunk slab = new UnsafeMemoryChunk(SLAB_SIZE);
+     try {
 -      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{slab});
++      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{slab});
+       try {
+         SimpleMemoryAllocatorImpl.validateAddress(0L);
+         fail("expected IllegalStateException");
+       } catch (IllegalStateException expected) {
+         assertEquals("Unexpected exception message: " + expected.getMessage(), true, expected.getMessage().contains("addr was smaller than expected"));
+       }
+       try {
+         SimpleMemoryAllocatorImpl.validateAddress(1L);
+         fail("expected IllegalStateException");
+       } catch (IllegalStateException expected) {
+         assertEquals("Unexpected exception message: " + expected.getMessage(), true, expected.getMessage().contains("Valid addresses must be in one of the following ranges:"));
+       }
+       SimpleMemoryAllocatorImpl.validateAddressAndSizeWithinSlab(slab.getMemoryAddress(), SLAB_SIZE, false);
+       SimpleMemoryAllocatorImpl.validateAddressAndSizeWithinSlab(slab.getMemoryAddress(), SLAB_SIZE, true);
+       SimpleMemoryAllocatorImpl.validateAddressAndSizeWithinSlab(slab.getMemoryAddress(), -1, true);
+       try {
+         SimpleMemoryAllocatorImpl.validateAddressAndSizeWithinSlab(slab.getMemoryAddress()-1, SLAB_SIZE, true);
+         fail("expected IllegalStateException");
+       } catch (IllegalStateException expected) {
+         assertEquals("Unexpected exception message: " + expected.getMessage(), true, expected.getMessage().equals(" address 0x" + Long.toString(slab.getMemoryAddress()-1, 16) + " does not address the original slab memory"));
+       }
+       try {
+         SimpleMemoryAllocatorImpl.validateAddressAndSizeWithinSlab(slab.getMemoryAddress(), SLAB_SIZE+1, true);
+         fail("expected IllegalStateException");
+       } catch (IllegalStateException expected) {
+         assertEquals("Unexpected exception message: " + expected.getMessage(), true, expected.getMessage().equals(" address 0x" + Long.toString(slab.getMemoryAddress()+SLAB_SIZE, 16) + " does not address the original slab memory"));
+       }
+     } finally {
+       SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+     }
+   }
+   @Test
+   public void testMemoryInspection() {
+     final int SLAB_SIZE = 1024*1024;
+     UnsafeMemoryChunk slab = new UnsafeMemoryChunk(SLAB_SIZE);
+     try {
 -      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{slab});
++      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{slab});
+       MemoryInspector inspector = ma.getMemoryInspector();
+       assertNotNull(inspector);
+       assertEquals(null, inspector.getFirstBlock());
+       assertEquals(Collections.emptyList(), inspector.getSnapshot());
+       assertEquals(Collections.emptyList(), inspector.getAllocatedBlocks());
+       assertEquals(null, inspector.getBlockAfter(null));
+       inspector.createSnapshot();
+       // call this twice for code coverage
+       inspector.createSnapshot();
+       try {
+         assertEquals(inspector.getAllBlocks(), inspector.getSnapshot());
+         MemoryBlock firstBlock = inspector.getFirstBlock();
+         assertNotNull(firstBlock);
+         assertEquals(1024*1024, firstBlock.getBlockSize());
+         assertEquals("N/A", firstBlock.getDataType());
+         assertEquals(-1, firstBlock.getFreeListId());
+         assertTrue(firstBlock.getMemoryAddress() > 0);
+         assertNull(firstBlock.getNextBlock());
+         assertEquals(0, firstBlock.getRefCount());
+         assertEquals(0, firstBlock.getSlabId());
+         assertEquals(MemoryBlock.State.UNUSED, firstBlock.getState());
+         assertFalse(firstBlock.isCompressed());
+         assertFalse(firstBlock.isSerialized());
+         assertEquals(null, inspector.getBlockAfter(firstBlock));
+       } finally {
+         inspector.clearSnapshot();
+       }
+     } finally {
+       SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+     }
+   }
+ 
+   @Test
+   public void testClose() {
+     System.setProperty(SimpleMemoryAllocatorImpl.FREE_OFF_HEAP_MEMORY_PROPERTY, "false");
+     UnsafeMemoryChunk slab = new UnsafeMemoryChunk(1024*1024);
+     boolean freeSlab = true;
+     UnsafeMemoryChunk[] slabs = new UnsafeMemoryChunk[]{slab};
+     try {
 -      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), slabs);
++      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), slabs);
+       ma.close();
+       ma.close();
+       System.setProperty(SimpleMemoryAllocatorImpl.FREE_OFF_HEAP_MEMORY_PROPERTY, "true");
+       try {
 -        ma = SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), slabs);
++        ma = SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), slabs);
+         ma.close();
+         freeSlab = false;
+         ma.close();
+       } finally {
+         System.clearProperty(SimpleMemoryAllocatorImpl.FREE_OFF_HEAP_MEMORY_PROPERTY);
+       }
+     } finally {
+       if (freeSlab) {
+         SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+       }
+     }
+     
+   }
+   
+   @Test
+   public void testCompaction() {
 -    final int perObjectOverhead = com.gemstone.gemfire.internal.offheap.Chunk.OFF_HEAP_HEADER_SIZE;
++    final int perObjectOverhead = com.gemstone.gemfire.internal.offheap.ObjectChunk.OFF_HEAP_HEADER_SIZE;
+     final int BIG_ALLOC_SIZE = 150000;
+     final int SMALL_ALLOC_SIZE = BIG_ALLOC_SIZE/2;
+     final int TOTAL_MEM = BIG_ALLOC_SIZE;
+     UnsafeMemoryChunk slab = new UnsafeMemoryChunk(TOTAL_MEM);
+     try {
 -      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{slab});
 -      MemoryChunk bmc = ma.allocate(BIG_ALLOC_SIZE-perObjectOverhead, null);
++      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{slab});
++      MemoryChunk bmc = ma.allocate(BIG_ALLOC_SIZE-perObjectOverhead);
+       try {
 -        MemoryChunk smc = ma.allocate(SMALL_ALLOC_SIZE-perObjectOverhead, null);
++        MemoryChunk smc = ma.allocate(SMALL_ALLOC_SIZE-perObjectOverhead);
+         fail("Expected out of memory");
+       } catch (OutOfOffHeapMemoryException expected) {
+       }
+       bmc.release();
+       assertEquals(TOTAL_MEM, ma.freeList.getFreeMemory());
 -      MemoryChunk smc1 = ma.allocate(SMALL_ALLOC_SIZE-perObjectOverhead, null);
 -      MemoryChunk smc2 = ma.allocate(SMALL_ALLOC_SIZE-perObjectOverhead, null);
++      MemoryChunk smc1 = ma.allocate(SMALL_ALLOC_SIZE-perObjectOverhead);
++      MemoryChunk smc2 = ma.allocate(SMALL_ALLOC_SIZE-perObjectOverhead);
+       smc2.release();
+       assertEquals(TOTAL_MEM-SMALL_ALLOC_SIZE, ma.freeList.getFreeMemory());
+       try {
 -        bmc = ma.allocate(BIG_ALLOC_SIZE-perObjectOverhead, null);
++        bmc = ma.allocate(BIG_ALLOC_SIZE-perObjectOverhead);
+         fail("Expected out of memory");
+       } catch (OutOfOffHeapMemoryException expected) {
+       }
+       smc1.release();
+       assertEquals(TOTAL_MEM, ma.freeList.getFreeMemory());
 -      bmc = ma.allocate(BIG_ALLOC_SIZE-perObjectOverhead, null);
++      bmc = ma.allocate(BIG_ALLOC_SIZE-perObjectOverhead);
+       bmc.release();
+       assertEquals(TOTAL_MEM, ma.freeList.getFreeMemory());
+       ArrayList<MemoryChunk> mcs = new ArrayList<MemoryChunk>();
+       for (int i=0; i < BIG_ALLOC_SIZE/(8+perObjectOverhead); i++) {
 -        mcs.add(ma.allocate(8, null));
++        mcs.add(ma.allocate(8));
+       }
+       checkMcs(mcs);
+       assertEquals(0, ma.freeList.getFreeMemory());
+       try {
 -        ma.allocate(8, null);
++        ma.allocate(8);
+         fail("expected out of memory");
+       } catch (OutOfOffHeapMemoryException expected) {
+       }
+       mcs.remove(0).release(); // frees 8+perObjectOverhead
+       assertEquals(8+perObjectOverhead, ma.freeList.getFreeMemory());
+       mcs.remove(0).release(); // frees 8+perObjectOverhead
+       assertEquals((8+perObjectOverhead)*2, ma.freeList.getFreeMemory());
 -      ma.allocate(16, null).release(); // allocates and frees 16+perObjectOverhead; still have perObjectOverhead
++      ma.allocate(16).release(); // allocates and frees 16+perObjectOverhead; still have perObjectOverhead
+       assertEquals((8+perObjectOverhead)*2, ma.freeList.getFreeMemory());
+       mcs.remove(0).release(); // frees 8+perObjectOverhead
+       assertEquals((8+perObjectOverhead)*3, ma.freeList.getFreeMemory());
+       mcs.remove(0).release(); // frees 8+perObjectOverhead
+       assertEquals((8+perObjectOverhead)*4, ma.freeList.getFreeMemory());
+       // At this point I should have 8*4 + perObjectOverhead*4 of free memory
 -      ma.allocate(8*4+perObjectOverhead*3, null).release();
++      ma.allocate(8*4+perObjectOverhead*3).release();
+       assertEquals((8+perObjectOverhead)*4, ma.freeList.getFreeMemory());
+       mcs.remove(0).release(); // frees 8+perObjectOverhead
+       assertEquals((8+perObjectOverhead)*5, ma.freeList.getFreeMemory());
+       // At this point I should have 8*5 + perObjectOverhead*5 of free memory
+       try {
 -        ma.allocate((8*5+perObjectOverhead*4)+1, null);
++        ma.allocate((8*5+perObjectOverhead*4)+1);
+         fail("expected out of memory");
+       } catch (OutOfOffHeapMemoryException expected) {
+       }
+       mcs.remove(0).release(); // frees 8+perObjectOverhead
+       assertEquals((8+perObjectOverhead)*6, ma.freeList.getFreeMemory());
+       checkMcs(mcs);
+       // At this point I should have 8*6 + perObjectOverhead*6 of free memory
 -      MemoryChunk mc24 = ma.allocate(24, null);
++      MemoryChunk mc24 = ma.allocate(24);
+       checkMcs(mcs);
+       assertEquals((8+perObjectOverhead)*6 - (24+perObjectOverhead), ma.freeList.getFreeMemory());
+       // At this point I should have 8*3 + perObjectOverhead*5 of free memory
 -      MemoryChunk mc16 = ma.allocate(16, null);
++      MemoryChunk mc16 = ma.allocate(16);
+       checkMcs(mcs);
+       assertEquals((8+perObjectOverhead)*6 - (24+perObjectOverhead) - (16+perObjectOverhead), ma.freeList.getFreeMemory());
+       // At this point I should have 8*1 + perObjectOverhead*4 of free memory
 -      mcs.add(ma.allocate(8, null));
++      mcs.add(ma.allocate(8));
+       checkMcs(mcs);
+       assertEquals((8+perObjectOverhead)*6 - (24+perObjectOverhead) - (16+perObjectOverhead) - (8+perObjectOverhead), ma.freeList.getFreeMemory());
+       // At this point I should have 8*0 + perObjectOverhead*3 of free memory
 -      MemoryChunk mcDO = ma.allocate(perObjectOverhead*2, null);
++      MemoryChunk mcDO = ma.allocate(perObjectOverhead*2);
+       checkMcs(mcs);
+       // At this point I should have 8*0 + perObjectOverhead*0 of free memory
+       assertEquals(0, ma.freeList.getFreeMemory());
+       try {
 -        ma.allocate(1, null);
++        ma.allocate(1);
+         fail("expected out of memory");
+       } catch (OutOfOffHeapMemoryException expected) {
+       }
+       checkMcs(mcs);
+       assertEquals(0, ma.freeList.getFreeMemory());
+       mcDO.release();
+       assertEquals((perObjectOverhead*3), ma.freeList.getFreeMemory());
+       mcs.remove(mcs.size()-1).release();
+       assertEquals((perObjectOverhead*3)+(8+perObjectOverhead), ma.freeList.getFreeMemory());
+       mc16.release();
+       assertEquals((perObjectOverhead*3)+(8+perObjectOverhead)+(16+perObjectOverhead), ma.freeList.getFreeMemory());
+       mc24.release();
+       assertEquals((perObjectOverhead*3)+(8+perObjectOverhead)+(16+perObjectOverhead)+(24+perObjectOverhead), ma.freeList.getFreeMemory());
+       
+       long freeMem = ma.freeList.getFreeMemory();
+       for (MemoryChunk mc: mcs) {
+         mc.release();
+         assertEquals(freeMem+(8+perObjectOverhead), ma.freeList.getFreeMemory());
+         freeMem += (8+perObjectOverhead);
+       }
+       mcs.clear();
+       assertEquals(TOTAL_MEM, ma.freeList.getFreeMemory());
 -      bmc = ma.allocate(BIG_ALLOC_SIZE-perObjectOverhead, null);
++      bmc = ma.allocate(BIG_ALLOC_SIZE-perObjectOverhead);
+       bmc.release();
+     } finally {
+       SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+     }
+   }
+   
+   long expectedMemoryUsage;
+   boolean memoryUsageEventReceived;
+   @Test
+   public void testUsageEventListener() {
 -    final int perObjectOverhead = com.gemstone.gemfire.internal.offheap.Chunk.OFF_HEAP_HEADER_SIZE;
++    final int perObjectOverhead = com.gemstone.gemfire.internal.offheap.ObjectChunk.OFF_HEAP_HEADER_SIZE;
+     final int SMALL_ALLOC_SIZE = 1000;
+     UnsafeMemoryChunk slab = new UnsafeMemoryChunk(3000);
+     try {
 -      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.create(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{slab});
++      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{slab});
+       MemoryUsageListener listener = new MemoryUsageListener() {
+         @Override
+         public void updateMemoryUsed(final long bytesUsed) {
+           SimpleMemoryAllocatorJUnitTest.this.memoryUsageEventReceived = true;
+           assertEquals(SimpleMemoryAllocatorJUnitTest.this.expectedMemoryUsage, bytesUsed);
+         }
+       };
+       ma.addMemoryUsageListener(listener);
+       
+       this.expectedMemoryUsage = SMALL_ALLOC_SIZE;
+       this.memoryUsageEventReceived = false;
 -      MemoryChunk smc = ma.allocate(SMALL_ALLOC_SIZE-perObjectOverhead, null);
++      MemoryChunk smc = ma.allocate(SMALL_ALLOC_SIZE-perObjectOverhead);
+       assertEquals(true, this.memoryUsageEventReceived);
+       
+       this.expectedMemoryUsage = SMALL_ALLOC_SIZE * 2;
+       this.memoryUsageEventReceived = false;
 -      smc = ma.allocate(SMALL_ALLOC_SIZE-perObjectOverhead, null);
++      smc = ma.allocate(SMALL_ALLOC_SIZE-perObjectOverhead);
+       assertEquals(true, this.memoryUsageEventReceived);
+       
+       MemoryUsageListener unaddedListener = new MemoryUsageListener() {
+         @Override
+         public void updateMemoryUsed(final long bytesUsed) {
+           throw new IllegalStateException("Should never be called");
+         }
+       };
+       ma.removeMemoryUsageListener(unaddedListener);
+       
+       ma.removeMemoryUsageListener(listener);
+       
+       ma.removeMemoryUsageListener(unaddedListener);
+ 
+       this.expectedMemoryUsage = SMALL_ALLOC_SIZE * 2;
+       this.memoryUsageEventReceived = false;
 -      smc = ma.allocate(SMALL_ALLOC_SIZE-perObjectOverhead, null);
++      smc = ma.allocate(SMALL_ALLOC_SIZE-perObjectOverhead);
+       assertEquals(false, this.memoryUsageEventReceived);
+       
+     } finally {
+       SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+     }
+   }
+   private void checkMcs(ArrayList<MemoryChunk> mcs) {
+     for (MemoryChunk mc: mcs) {
+       assertEquals(8+8, mc.getSize());
+     }
+   }
+   
+   @Test
+   public void testOutOfOffHeapMemory() {
 -    final int perObjectOverhead = com.gemstone.gemfire.internal.offheap.Chunk.OFF_HEAP_HEADER_SIZE;
++    final int perObjectOverhead = com.gemstone.gemfire.internal.offheap.ObjectChunk.OFF_HEAP_HEADER_SIZE;
+     final int BIG_ALLOC_SIZE = 150000;
+     final int SMALL_ALLOC_SIZE = BIG_ALLOC_SIZE/2;
+     final int TOTAL_MEM = BIG_ALLOC_SIZE;
+     final UnsafeMemoryChunk slab = new UnsafeMemoryChunk(TOTAL_MEM);
+     final AtomicReference<OutOfOffHeapMemoryException> ooom = new AtomicReference<OutOfOffHeapMemoryException>();
+     final OutOfOffHeapMemoryListener oooml = new OutOfOffHeapMemoryListener() {
+       @Override
+       public void outOfOffHeapMemory(OutOfOffHeapMemoryException cause) {
+         ooom.set(cause);
+       }
+       @Override
+       public void close() {
+       }
+     };
+     try {
 -      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.create(oooml, new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{slab});
++      SimpleMemoryAllocatorImpl ma = SimpleMemoryAllocatorImpl.createForUnitTest(oooml, new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{slab});
+       // make a big allocation
 -      MemoryChunk bmc = ma.allocate(BIG_ALLOC_SIZE-perObjectOverhead, null);
++      MemoryChunk bmc = ma.allocate(BIG_ALLOC_SIZE-perObjectOverhead);
+       assertNull(ooom.get());
+       // drive the ma to ooom with small allocations
+       try {
 -        MemoryChunk smc = ma.allocate(SMALL_ALLOC_SIZE-perObjectOverhead, null);
++        MemoryChunk smc = ma.allocate(SMALL_ALLOC_SIZE-perObjectOverhead);
+         fail("Expected out of memory");
+       } catch (OutOfOffHeapMemoryException expected) {
+       }
+       assertNotNull(ooom.get());
+       assertTrue(ooom.get().getMessage().contains("Out of off-heap memory. Could not allocate size of "));
+     } finally {
+       SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+     }
+   }
+ }