You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/22 22:44:05 UTC

[077/100] [abbrv] incubator-geode git commit: GEODE-917: Merge branch 'feature/GEODE-917' into develop

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java
index 0000000,4df8f35..e19d7bf
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java
@@@ -1,0 -1,1286 +1,1286 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package com.gemstone.gemfire.internal.cache.wan;
+ 
+ import java.io.DataInput;
+ import java.io.DataOutput;
+ import java.io.IOException;
+ import java.io.InputStream;
+ 
+ import com.gemstone.gemfire.DataSerializer;
+ import com.gemstone.gemfire.InternalGemFireError;
+ import com.gemstone.gemfire.cache.CacheEvent;
+ import com.gemstone.gemfire.cache.CacheFactory;
+ import com.gemstone.gemfire.cache.EntryEvent;
+ import com.gemstone.gemfire.cache.Operation;
+ import com.gemstone.gemfire.cache.Region;
+ import com.gemstone.gemfire.cache.SerializedCacheValue;
+ import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
+ import com.gemstone.gemfire.cache.util.ObjectSizer;
+ import com.gemstone.gemfire.cache.wan.EventSequenceID;
+ import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+ import com.gemstone.gemfire.internal.DataSerializableFixedID;
+ import com.gemstone.gemfire.internal.InternalDataSerializer;
+ import com.gemstone.gemfire.internal.Version;
+ import com.gemstone.gemfire.internal.VersionedDataInputStream;
+ import com.gemstone.gemfire.internal.cache.CachedDeserializable;
+ import com.gemstone.gemfire.internal.cache.CachedDeserializableFactory;
+ import com.gemstone.gemfire.internal.cache.Conflatable;
+ import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+ import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
+ import com.gemstone.gemfire.internal.cache.EventID;
+ import com.gemstone.gemfire.internal.cache.LocalRegion;
+ import com.gemstone.gemfire.internal.cache.Token;
+ import com.gemstone.gemfire.internal.cache.WrappedCallbackArgument;
+ import com.gemstone.gemfire.internal.cache.lru.Sizeable;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper;
+ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 -import com.gemstone.gemfire.internal.offheap.Chunk;
 -import com.gemstone.gemfire.internal.offheap.ChunkWithHeapForm;
++import com.gemstone.gemfire.internal.offheap.ObjectChunk;
++import com.gemstone.gemfire.internal.offheap.ObjectChunkWithHeapForm;
+ import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+ import com.gemstone.gemfire.internal.offheap.ReferenceCountHelper;
+ import com.gemstone.gemfire.internal.offheap.Releasable;
+ import com.gemstone.gemfire.internal.offheap.StoredObject;
+ import com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier;
+ import com.gemstone.gemfire.internal.offheap.annotations.Released;
+ import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+ import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+ 
+ /**
+  * Class <code>GatewaySenderEventImpl</code> represents an event sent between
+  * <code>GatewaySender</code>
+  * 
+  * @author Suranjan Kumar
+  * 
+  * @since 7.0
+  * 
+  */
+ public class GatewaySenderEventImpl implements 
+     AsyncEvent, DataSerializableFixedID, Conflatable, Sizeable, Releasable {
+   private static final long serialVersionUID = -5690172020872255422L;
+ 
+   protected static final Object TOKEN_NULL = new Object();
+ 
+   protected static final short VERSION = 0x11;
+   
+   protected EnumListenerEvent operation;
+ 
+   protected Object substituteValue;
+ 
+   /**
+    * The action to be taken (e.g. AFTER_CREATE)
+    */
+   protected int action;
+   
+   /**
+    * The operation detail of EntryEvent (e.g. LOAD, PUTALL etc.)
+    */
+   protected int operationDetail;
+   
+   /**
+    * The number of parts for the <code>Message</code>
+    * 
+    * @see com.gemstone.gemfire.internal.cache.tier.sockets.Message
+    */
+   protected int numberOfParts;
+ 
+   /**
+    * The identifier of this event
+    */
+   protected EventID id;
+ 
+   /**
+    * The <code>Region</code> that was updated
+    */
+   private transient LocalRegion region;
+ 
+   /**
+    * The name of the region being affected by this event
+    */
+   protected String regionPath;
+ 
+   /**
+    * The key being affected by this event
+    */
+   protected Object key;
+ 
+   /**
+    * The serialized new value for this event's key.
+    * May not be computed at construction time.
+    */
+   protected byte[] value;
+   
+   /**
+    * The "object" form of the value.
+    * Will be null after this object is deserialized.
+    */
+   @Retained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
+   protected transient Object valueObj;
+   protected transient boolean valueObjReleased;
+ 
+   /**
+    * Whether the value is a serialized object or just a byte[]
+    */
+   protected byte valueIsObject;
+ 
+   /**
+    * The callback argument for this event
+    */
+   protected GatewaySenderEventCallbackArgument callbackArgument;
+ 
+   /**
+    * The version timestamp
+    */
+   protected long versionTimeStamp;
+   
+   /**
+    * Whether this event is a possible duplicate
+    */
+   protected boolean possibleDuplicate;
+ 
+   /**
+    * Whether this event is acknowledged after the ack received by
+    * AckReaderThread. As of now this is getting used for PDX related
+    * GatewaySenderEvent. But can be extended for for other GatewaySenderEvent.
+    */
+   protected volatile boolean isAcked;
+   
+   /**
+    * Whether this event is dispatched by dispatcher. As of now this is getting
+    * used for PDX related GatewaySenderEvent. But can be extended for for other
+    * GatewaySenderEvent.
+    */
+   protected volatile boolean isDispatched;
+   /**
+    * The creation timestamp in ms
+    */
+   protected long creationTime;
+ 
+   /**
+    * For ParalledGatewaySender we need bucketId of the PartitionRegion on which
+    * the update operation was applied.
+    */
+   protected int bucketId;
+ 
+   protected Long shadowKey = Long.valueOf(-1L);
+   
+   protected boolean isInitialized;
+ 
+   /**
+    * Is this thread in the process of serializing this event?
+    */
+   public static final ThreadLocal isSerializingValue = new ThreadLocal() {
+     @Override
+     protected Object initialValue() {
+       return Boolean.FALSE;
+     }
+   };
+ 
+   private static final int CREATE_ACTION = 0;
+ 
+   private static final int UPDATE_ACTION = 1;
+ 
+   private static final int DESTROY_ACTION = 2;
+ 
+   private static final int VERSION_ACTION = 3;
+   
+   private static final int INVALIDATE_ACTION = 5;
+   /**
+    * Static constants for Operation detail of EntryEvent.
+    */
+   private static final int OP_DETAIL_NONE = 10;
+   
+   private static final int OP_DETAIL_LOCAL_LOAD = 11;
+   
+   private static final int OP_DETAIL_NET_LOAD = 12;
+   
+   private static final int OP_DETAIL_PUTALL = 13;
+   
+   private static final int OP_DETAIL_REMOVEALL = 14;
+ 
+ //  /**
+ //   * Is this thread in the process of deserializing this event?
+ //   */
+ //  public static final ThreadLocal isDeserializingValue = new ThreadLocal() {
+ //    @Override
+ //    protected Object initialValue() {
+ //      return Boolean.FALSE;
+ //    }
+ //  };
+ 
+   /**
+    * Constructor. No-arg constructor for data serialization.
+    * 
+    * @see DataSerializer
+    */
+   public GatewaySenderEventImpl() {
+   }
+ 
+   /**
+    * Constructor. Creates an initialized <code>GatewayEventImpl</code>
+    * 
+    * @param operation
+    *          The operation for this event (e.g. AFTER_CREATE)
+    * @param event
+    *          The <code>CacheEvent</code> on which this
+    *          <code>GatewayEventImpl</code> is based
+    * @param substituteValue
+    *          The value to be enqueued instead of the value in the event.
+    * 
+    * @throws IOException
+    */
+   @Retained
+   public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent event,
+       Object substituteValue) throws IOException {
+     this(operation, event, substituteValue, true);
+   }
+ 
+   @Retained
+   public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent event,
+       Object substituteValue, boolean initialize, int bucketId)
+       throws IOException {
+     this(operation, event, substituteValue, initialize);
+     this.bucketId = bucketId;
+   }
+ 
+   /**
+    * Constructor.
+    * 
+    * @param operation
+    *          The operation for this event (e.g. AFTER_CREATE)
+    * @param ce
+    *          The <code>CacheEvent</code> on which this
+    *          <code>GatewayEventImpl</code> is based
+    * @param substituteValue
+    *          The value to be enqueued instead of the value in the event.
+    * @param initialize
+    *          Whether to initialize this instance
+    * 
+    * @throws IOException
+    */
+   @Retained
+   public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent ce,
+       Object substituteValue, boolean initialize) throws IOException {
+     // Set the operation and event
+     final EntryEventImpl event = (EntryEventImpl)ce;
+     this.operation = operation;
+     this.substituteValue = substituteValue;
+ 
+     // Initialize the region name. This is being done here because the event
+     // can get serialized/deserialized (for some reason) between the time
+     // it is set above and used (in initialize). If this happens, the
+     // region is null because it is a transient field of the event.
+     this.region = (LocalRegion)event.getRegion();
+     this.regionPath = this.region.getFullPath();
+ 
+     // Initialize the unique id
+     initializeId(event);
+ 
+     // Initialize possible duplicate
+     this.possibleDuplicate = event.isPossibleDuplicate(); 
+     
+     //Initialize ack and dispatch status of events
+     this.isAcked = false;
+     this.isDispatched = false;
+     
+ 
+     // Initialize the creation timestamp
+     this.creationTime = System.currentTimeMillis();
+ 
+     if (event.getVersionTag() != null) {
+       this.versionTimeStamp = event.getVersionTag().getVersionTimeStamp();
+     }
+ 
+     // Set key
+     // System.out.println("this._entryEvent: " + event);
+     // System.out.println("this._entryEvent.getKey(): " +
+     // event.getKey());
+     this.key = event.getKey();
+ 
+     initializeValue(event);
+ 
+     // Set the callback arg
+     this.callbackArgument = (GatewaySenderEventCallbackArgument)
+         event.getRawCallbackArgument();
+ 
+     // Initialize the action and number of parts (called after _callbackArgument
+     // is set above)
+     initializeAction(this.operation);
+     
+     //initialize the operation detail 
+     initializeOperationDetail(event.getOperation());
+ 
+     setShadowKey(event.getTailKey());
+     
+     if (initialize) {
+       initialize();
+     }
+   }
+ 
+   /**
+    * Used to create a heap copy of an offHeap event.
+    * Note that this constructor produces an instance that does not need to be released.
+    */
+   protected GatewaySenderEventImpl(GatewaySenderEventImpl offHeapEvent) {
+     this.operation = offHeapEvent.operation;
+     this.action = offHeapEvent.action;
+     this.numberOfParts = offHeapEvent.numberOfParts;
+     this.id = offHeapEvent.id;
+     this.region = offHeapEvent.region;
+     this.regionPath = offHeapEvent.regionPath;
+     this.key = offHeapEvent.key;
+     this.callbackArgument = offHeapEvent.callbackArgument;
+     this.versionTimeStamp = offHeapEvent.versionTimeStamp;
+     this.possibleDuplicate = offHeapEvent.possibleDuplicate;
+     this.isAcked = offHeapEvent.isAcked;
+     this.isDispatched = offHeapEvent.isDispatched;
+     this.creationTime = offHeapEvent.creationTime;
+     this.bucketId = offHeapEvent.bucketId;
+     this.shadowKey = offHeapEvent.shadowKey;
+     this.isInitialized = offHeapEvent.isInitialized;
+ 
+     this.valueObj = null;
+     this.valueObjReleased = false;
+     this.valueIsObject = offHeapEvent.valueIsObject;
+     this.value = offHeapEvent.getSerializedValue();
+   }
+   
+   /**
+    * Returns this event's action
+    * 
+    * @return this event's action
+    */
+   public int getAction() {
+     return this.action;
+   }
+ 
+   /**
+    * Returns this event's operation
+    * 
+    * @return this event's operation
+    */
+   public Operation getOperation() {
+     Operation op = null;
+     switch (this.action) {
+     case CREATE_ACTION:
+       switch (this.operationDetail) {
+       case OP_DETAIL_LOCAL_LOAD:
+         op = Operation.LOCAL_LOAD_CREATE;
+         break;
+       case OP_DETAIL_NET_LOAD:
+     	op = Operation.NET_LOAD_CREATE;
+     	break;
+       case OP_DETAIL_PUTALL:
+         op = Operation.PUTALL_CREATE;
+     	break;
+       case OP_DETAIL_NONE:
+     	op = Operation.CREATE;
+     	break;
+       //if operationDetail is none of the above, then default should be NONE 
+       default:
+     	op = Operation.CREATE;
+     	break;
+       }
+       break;
+     case UPDATE_ACTION:
+       switch (this.operationDetail) {
+       case OP_DETAIL_LOCAL_LOAD:
+     	op = Operation.LOCAL_LOAD_UPDATE;
+     	break;
+       case OP_DETAIL_NET_LOAD:
+     	op = Operation.NET_LOAD_UPDATE;
+     	break;
+       case OP_DETAIL_PUTALL:
+     	op = Operation.PUTALL_UPDATE;
+     	break;
+       case OP_DETAIL_NONE:
+     	op = Operation.UPDATE;
+     	break;
+       //if operationDetail is none of the above, then default should be NONE 
+       default:
+     	op = Operation.UPDATE;
+     	break;
+       }
+       break;
+     case DESTROY_ACTION:
+       if (this.operationDetail == OP_DETAIL_REMOVEALL) {
+         op = Operation.REMOVEALL_DESTROY;
+       } else {
+         op = Operation.DESTROY;
+       }
+       break;
+     case VERSION_ACTION:
+       op = Operation.UPDATE_VERSION_STAMP;
+       break;
+     case INVALIDATE_ACTION:
+       op = Operation.INVALIDATE;
+       break;
+     }
+     return op;
+   }
+ 
+   public Object getSubstituteValue() {
+     return this.substituteValue;
+   }
+   
+   public EnumListenerEvent getEnumListenerEvent(){
+     return this.operation;
+   }
+   /**
+    * Return this event's region name
+    * 
+    * @return this event's region name
+    */
+   public String getRegionPath() {
+     return this.regionPath;
+   }
+ 
+   public boolean isInitialized() {
+     return this.isInitialized;
+   }
+   /**
+    * Returns this event's key
+    * 
+    * @return this event's key
+    */
+   public Object getKey() {
+     // TODO:Asif : Ideally would like to have throw exception if the key
+     // is TOKEN_UN_INITIALIZED, but for the time being trying to retain the GFE
+     // behaviour
+     // of returning null if getKey is invoked on un-initialized gateway event
+     return isInitialized() ? this.key : null;
+   }
+ 
+   /**
+    * Returns whether this event's value is a serialized object
+    * 
+    * @return whether this event's value is a serialized object
+    */
+   public byte getValueIsObject() {
+     return this.valueIsObject;
+   }
+ 
+   /**
+    * Return this event's callback argument
+    * 
+    * @return this event's callback argument
+    */
+   public Object getCallbackArgument() {
+     Object result = getSenderCallbackArgument();
+     while (result instanceof WrappedCallbackArgument) {
+       WrappedCallbackArgument wca = (WrappedCallbackArgument)result;
+       result = wca.getOriginalCallbackArg();
+     }
+     return result;
+   }
+ 
+   public GatewaySenderEventCallbackArgument getSenderCallbackArgument() {
+     return this.callbackArgument;
+   }
+ 
+   /**
+    * Return this event's number of parts
+    * 
+    * @return this event's number of parts
+    */
+   public int getNumberOfParts() {
+     return this.numberOfParts;
+   }
+   
+   /**
+    * Return the value as a byte[] array, if it is plain byte array,
+    * otherwise return a cache deserializable or plain object, depending
+    * on if the currently held form of the object is serialized or not.
+    * 
+    * If the object is held off heap, this will copy it to the heap return the heap copy.
+    * 
+    *  //OFFHEAP TODO: Optimize callers by returning a reference to the off heap value
+    */
+   public Object getValue() {
+     if (CachedDeserializableFactory.preferObject()) {
+       // sqlf does not use CacheDeserializable wrappers
+       return getDeserializedValue();
+     }
+     Object rawValue = this.value;
+     if (rawValue == null) {
+       @Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
+       Object vo = this.valueObj;
+       if (vo instanceof StoredObject) {
+         rawValue = ((StoredObject) vo).getValueAsHeapByteArray();
+       } else {
+         rawValue = vo;
+       }
+     }
+     if (valueIsObject == 0x00) {
+       //if the value is a byte array, just return it
+       return rawValue;
+     } else if (CachedDeserializableFactory.preferObject()) {
+       // sqlf does not use CacheDeserializable wrappers
+       return rawValue;
+     } else if (rawValue instanceof byte[]) {
+       return CachedDeserializableFactory.create((byte[]) rawValue);
+     } else {
+       return rawValue;
+     }
+   }
+   
+   /**
+    * Return the currently held form of the object.
+    * May return a retained OFF_HEAP_REFERENCE.
+    */
+   @Retained
+   public Object getRawValue() {
+     @Retained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
+     Object result = this.value;
+     if (result == null) {
+       result = this.valueObj;
 -      if (result instanceof Chunk) {
++      if (result instanceof ObjectChunk) {
+         if (this.valueObjReleased) {
+           result = null;
+         } else {
 -          Chunk ohref = (Chunk) result;
++          ObjectChunk ohref = (ObjectChunk) result;
+           if (!ohref.retain()) {
+             result = null;
+           } else if (this.valueObjReleased) {
+             ohref.release();
+             result = null;
+           }
+         }
+       }
+     }
+     return result;
+   }
+ 
+   /**
+    * This method is meant for internal use by the SimpleMemoryAllocatorImpl.
+    * Others should use getRawValue instead.
+    * @return if the result is an off-heap reference then callers must use it before this event is released.
+    */
+   @Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
+   public Object getValueObject() {
+     return this.valueObj;
+   }
+ 
+   /**
+    * Return this event's deserialized value
+    * 
+    * @return this event's deserialized value
+    */
+   public Object getDeserializedValue() {
+ // TODO OFFHEAP MERGE: handle substituteValue here?
+     if (this.valueIsObject == 0x00) {
+       Object result = this.value;
+       if (result == null) {
+         @Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
+         Object so = this.valueObj;
+         if (this.valueObjReleased) {
+           throw new IllegalStateException("Value is no longer available. getDeserializedValue must be called before processEvents returns.");
+         }
+         if (so instanceof StoredObject) {
+           // TODO OFFHEAP: returns off-heap PdxInstance
+           return ((StoredObject)so).getValueAsDeserializedHeapObject();
+         } else {
+           throw new IllegalStateException("expected valueObj field to be an instance of StoredObject but it was " + so);
+         }
+       }
+       return result;
+     }
+     else {
+       Object vo = this.valueObj;
+       if (vo != null) {
+         if (vo instanceof StoredObject) {
+           @Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
+           StoredObject so = (StoredObject)vo;
+           // TODO OFFHEAP: returns off-heap PdxInstance
+           return so.getValueAsDeserializedHeapObject();
+         } else {
+           return vo; // it is already deserialized
+         }
+       } else {
+         if (this.value != null) {
+           Object result = EntryEventImpl.deserialize(this.value);
+           this.valueObj = result;
+           return result;
+         } else {
+           if (this.valueObjReleased) {
+             throw new IllegalStateException("Value is no longer available. getDeserializedValue must be called before processEvents returns.");
+           }
+           // both value and valueObj are null but we did not free it.
+           return null;
+         }
+       }
+     }
+   }
+   
+   /**
+    * Returns the value in the form of a String.
+    * This should be used by code that wants to log
+    * the value. This is a debugging exception.
+    */
+   public String getValueAsString(boolean deserialize) {
+ // TODO OFFHEAP MERGE: handle substituteValue here?
+     Object v = this.value;
+     if (deserialize) {
+       try {
+         v = getDeserializedValue();
+       } catch (Exception e) {
+         return "Could not convert value to string because " + e;
+       } catch (InternalGemFireError e) { // catch this error for bug 49147
+         return "Could not convert value to string because " + e;
+       }
+     }
+     if (v == null) {
+       @Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
+       Object ov = this.valueObj;
+       if (ov instanceof CachedDeserializable) {
+         return ((CachedDeserializable) ov).getStringForm();
+       }
+     }
+     if (v != null) {
+       if (v instanceof byte[]) {
+         byte[] bav = (byte[]) v;
+         // Using Arrays.toString(bav) can cause us to run out of memory
+         return "byte[" + bav.length + "]";
+       } else {
+         return v.toString();
+       }
+     } else {
+       return "";
+     }
+   }
+ 
+   /**
+    * If the value owned of this event is just bytes return that byte array;
+    * otherwise serialize the value object and return the serialized bytes.
+    * Use {@link #getValueIsObject()} to determine if the result is raw or serialized bytes.
+    */
+   public byte[] getSerializedValue() {
+     byte[] result = this.value;
+     if (result == null) {
+       @Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
+       Object vo = this.valueObj;
+       if (vo instanceof StoredObject) {
+         synchronized (this) {
+           result = this.value;
+           if (result == null) {
+             StoredObject so = (StoredObject) vo;
+             result = so.getValueAsHeapByteArray();
+             this.value = result;
+           }
+         }
+       } else {
+         synchronized (this) {
+           result = this.value;
+           if (result == null && vo != null && !(vo instanceof Token)) {
+             result = EntryEventImpl.serialize(vo);
+             this.value = result;
+           } else if (result == null) {
+             if (this.valueObjReleased) {
+               throw new IllegalStateException("Value is no longer available. getSerializedValue must be called before processEvents returns.");
+             }
+           }
+         }
+       }
+     }
+     return result;
+   }
+ 
+   public void setPossibleDuplicate(boolean possibleDuplicate) {
+     this.possibleDuplicate = possibleDuplicate;
+   }
+ 
+   public boolean getPossibleDuplicate() {
+     return this.possibleDuplicate;
+   }
+ 
+   public long getCreationTime() {
+     return this.creationTime;
+   }
+ 
+   public int getDSFID() {
+     return GATEWAY_SENDER_EVENT_IMPL;
+   }
+ 
+   public void toData(DataOutput out) throws IOException {
+     // Make sure we are initialized before we serialize.
+     initialize();
+     out.writeShort(VERSION);
+     out.writeInt(this.action);
+     out.writeInt(this.numberOfParts);
+     // out.writeUTF(this._id);
+     DataSerializer.writeObject(this.id, out);
+     DataSerializer.writeString(this.regionPath, out);
+     out.writeByte(this.valueIsObject);
+     serializeKey(out);
+     DataSerializer.writeByteArray(getSerializedValue(), out);
+     DataSerializer.writeObject(this.callbackArgument, out);
+     out.writeBoolean(this.possibleDuplicate);
+     out.writeLong(this.creationTime);
+     out.writeInt(this.bucketId);
+     out.writeLong(this.shadowKey);
+     out.writeLong(getVersionTimeStamp());    
+   }
+ 
+   protected void serializeKey(DataOutput out) throws IOException {
+     DataSerializer.writeObject(this.key, out);
+   }
+ 
+   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+     short version = in.readShort();
+     if (version != VERSION) {
+       // warning?
+     }
+     this.isInitialized = true;
+     this.action = in.readInt();
+     this.numberOfParts = in.readInt();
+     // this._id = in.readUTF();
+     if (version < 0x11 &&
+         (in instanceof InputStream) &&
+         InternalDataSerializer.getVersionForDataStream(in) == Version.CURRENT) {
+       in = new VersionedDataInputStream((InputStream)in, Version.GFE_701);
+     }
+     this.id = (EventID)DataSerializer.readObject(in);   
+     // TODO:Asif ; Check if this violates Barry's logic of not assiging VM
+     // specific Token.FROM_GATEWAY
+     // and retain the serialized Token.FROM_GATEWAY
+     // this._id.setFromGateway(false);
+     this.regionPath = DataSerializer.readString(in);
+     this.valueIsObject = in.readByte();
+     deserializeKey(in);
+     this.value = DataSerializer.readByteArray(in);
+     this.callbackArgument = (GatewaySenderEventCallbackArgument)DataSerializer
+         .readObject(in);
+     this.possibleDuplicate = in.readBoolean();
+     this.creationTime = in.readLong();
+     this.bucketId = in.readInt();
+     this.shadowKey = in.readLong();
+     this.versionTimeStamp = in.readLong();
+     // TODO should this call initializeKey()?
+   }
+ 
+   protected void deserializeKey(DataInput in) throws IOException,
+       ClassNotFoundException {
+     this.key = DataSerializer.readObject(in);
+   }
+ 
+   @Override
+   public String toString() {
+     StringBuffer buffer = new StringBuffer();
+     buffer.append("SenderEventImpl[").append("id=").append(this.id)
+         .append(";action=").append(this.action).append(";operation=")
+         .append(getOperation()).append(";region=").append(this.regionPath)
+         .append(";key=").append(this.key).append(";value=")
+         .append(getValueAsString(true)).append(";valueIsObject=")
+         .append(this.valueIsObject).append(";numberOfParts=")
+         .append(this.numberOfParts).append(";callbackArgument=")
+         .append(this.callbackArgument).append(";possibleDuplicate=")
+         .append(this.possibleDuplicate).append(";creationTime=")
+         .append(this.creationTime).append(";shadowKey= ")
+         .append(this.shadowKey)
+         .append(";timeStamp=").append(this.versionTimeStamp)
+         .append(";acked=").append(this.isAcked)
+         .append(";dispatched=").append(this.isDispatched)
+         .append("]");
+     return buffer.toString();
+   }
+ 
+   public static boolean isSerializingValue() {
+     return ((Boolean)isSerializingValue.get()).booleanValue();
+   }
+ 
+ //   public static boolean isDeserializingValue() {
+ //     return ((Boolean)isDeserializingValue.get()).booleanValue();
+ //   }
+ 
+   // / Conflatable interface methods ///
+ 
+   /**
+    * Determines whether or not to conflate this message. This method will answer
+    * true IFF the message's operation is AFTER_UPDATE and its region has enabled
+    * are conflation. Otherwise, this method will answer false. Messages whose
+    * operation is AFTER_CREATE, AFTER_DESTROY, AFTER_INVALIDATE or
+    * AFTER_REGION_DESTROY are not conflated.
+    * 
+    * @return Whether to conflate this message
+    */
+   public boolean shouldBeConflated() {
+     // If the message is an update, it may be conflatable. If it is a
+     // create, destroy, invalidate or destroy-region, it is not conflatable.
+     // Only updates are conflated.
+     return isUpdate();
+   }
+ 
+   public String getRegionToConflate() {
+     return this.regionPath;
+   }
+ 
+   public Object getKeyToConflate() {
+     return this.key;
+   }
+ 
+   public Object getValueToConflate() {
+     // Since all the uses of this are for logging
+     // changing it to return the string form of the value
+     // instead of the actual value.
+     return this.getValueAsString(true);
+   }
+ 
+   public void setLatestValue(Object value) {
+     // Currently this method is never used.
+     // If someone does want to use it in the future
+     // then the implementation needs to be updated
+     // to correctly update value, valueObj, and valueIsObject
+     throw new UnsupportedOperationException();
+   }
+ 
+   // / End Conflatable interface methods ///
+ 
+   /**
+    * Returns whether this <code>GatewayEvent</code> represents an update.
+    * 
+    * @return whether this <code>GatewayEvent</code> represents an update
+    */
+   protected boolean isUpdate() {
+     // This event can be in one of three states:
+     // - in memory primary (initialized)
+     // - in memory secondary (not initialized)
+     // - evicted to disk, read back in (initialized)
+     // In the first case, both the operation and action are set.
+     // In the second case, only the operation is set.
+     // In the third case, only the action is set.
+     return this.operation == null ? this.action == UPDATE_ACTION
+         : this.operation == EnumListenerEvent.AFTER_UPDATE;
+   }
+ 
+   /**
+    * Returns whether this <code>GatewayEvent</code> represents a create.
+    * 
+    * @return whether this <code>GatewayEvent</code> represents a create
+    */
+   protected boolean isCreate() {
+     // See the comment in isUpdate() for additional details
+     return this.operation == null ? this.action == CREATE_ACTION
+         : this.operation == EnumListenerEvent.AFTER_CREATE;
+   }
+ 
+   /**
+    * Returns whether this <code>GatewayEvent</code> represents a destroy.
+    * 
+    * @return whether this <code>GatewayEvent</code> represents a destroy
+    */
+   protected boolean isDestroy() {
+     // See the comment in isUpdate() for additional details
+     return this.operation == null ? this.action == DESTROY_ACTION
+         : this.operation == EnumListenerEvent.AFTER_DESTROY;
+   }
+ 
+   /**
+    * Initialize the unique identifier for this event. This id is used by the
+    * receiving <code>Gateway</code> to keep track of which events have been
+    * processed. Duplicates can be dropped.
+    */
+   private void initializeId(EntryEventImpl event) {
+     // CS43_HA
+     this.id = event.getEventId();
+     // TODO:ASIF :Once stabilized remove the check below
+     if (this.id == null) {
+       throw new IllegalStateException(
+           LocalizedStrings.GatewayEventImpl_NO_EVENT_ID_IS_AVAILABLE_FOR_THIS_GATEWAY_EVENT
+               .toLocalizedString());
+     }
+ 
+   }
+ 
+   /**
+    * Initialize this instance. Get the useful parts of the input operation and
+    * event.
+    */
+   public void initialize() {
+     if (isInitialized()) {
+       return;
+     }
+     this.isInitialized = true;
+   }
+ 
+ 
+   // Initializes the value object. This function need a relook because the 
+   // serialization of the value looks unnecessary.
+   @Retained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
+   protected void initializeValue(EntryEventImpl event) throws IOException {
+     // Set the value to be a byte[] representation of either the value or
+     // substituteValue (if set).
+     if (this.substituteValue == null) {
+     // If the value is already serialized, use it.
+     this.valueIsObject = 0x01;
+     /**
+      * so ends up being stored in this.valueObj
+      */
+     @Retained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
+     StoredObject so = null;
+     if (event.hasDelta()) {
+       this.valueIsObject = 0x02;
+     } else {
+       ReferenceCountHelper.setReferenceCountOwner(this);
+       so = event.getOffHeapNewValue();
+       ReferenceCountHelper.setReferenceCountOwner(null);      
+         // TODO OFFHEAP MERGE: check for a cached serialized value first
+         // so we can use it instead of reading offheap
+         // If we do read offheap then add the serialize new value to the event cache
+     }
+     
+     if (so != null) {
+ //    if (so != null  && !event.hasDelta()) {
+       // Since GatewaySenderEventImpl instances can live for a long time in the gateway region queue
+       // we do not want the StoredObject to be one that keeps the heap form cached.
 -      if (so instanceof ChunkWithHeapForm) {
 -        so = ((ChunkWithHeapForm) so).getChunkWithoutHeapForm(); // fixes 51999
++      if (so instanceof ObjectChunkWithHeapForm) {
++        so = ((ObjectChunkWithHeapForm) so).getChunkWithoutHeapForm(); // fixes 51999
+       }
+       this.valueObj = so;
+       if (!so.isSerialized()) {
+         this.valueIsObject = 0x00;
+       }
+     } else if (event.getCachedSerializedNewValue() != null) {
+       // We want this to have lower precedence than StoredObject so that the gateway
+       // can share a reference to the off-heap value.
+       this.value = event.getCachedSerializedNewValue();
+     } else {
+       final Object newValue = event.getRawNewValue(shouldApplyDelta());
+       assert !(newValue instanceof StoredObject); // since we already called getOffHeapNewValue() and it returned null
+       if (newValue instanceof CachedDeserializable) {
+         this.value = ((CachedDeserializable) newValue).getSerializedValue();
+       } else if (newValue instanceof byte[]) {
+         // The value is byte[]. Set _valueIsObject flag to 0x00 (not an object)
+         this.value = (byte[])newValue;
+         this.valueIsObject = 0x00;
+       } else {
+         // The value is an object. It will be serialized later when getSerializedValue is called.
+         this.valueObj = newValue;
+         // to prevent bug 48281 we need to serialize it now
+         this.getSerializedValue();
+         this.valueObj = null;
+       }
+     }
+     } else {
+       // The substituteValue is set. Use it.
+       if (this.substituteValue instanceof byte[]) {
+         // The substituteValue is byte[]. Set valueIsObject flag to 0x00 (not an object)
+         this.value = (byte[]) this.substituteValue;
+         this.valueIsObject = 0x00;
+       } else if (this.substituteValue == TOKEN_NULL) {
+         // The substituteValue represents null. Set the value to null.
+         this.value = null;
+         this.valueIsObject = 0x01;
+       } else {
+         // The substituteValue is an object. Serialize it.
+         isSerializingValue.set(Boolean.TRUE);
+         this.value = CacheServerHelper.serialize(this.substituteValue);
+         isSerializingValue.set(Boolean.FALSE);
+         event.setCachedSerializedNewValue(this.value);
+         this.valueIsObject = 0x01;
+       }
+     }
+   }
+ 
+   protected boolean shouldApplyDelta() {
+     return false;
+   }
+ 
+   /**
+    * Initialize this event's action and number of parts
+    * 
+    * @param operation
+    *          The operation from which to initialize this event's action and
+    *          number of parts
+    */
+   protected void initializeAction(EnumListenerEvent operation) {
+     if (operation == EnumListenerEvent.AFTER_CREATE) {
+       // Initialize after create action
+       this.action = CREATE_ACTION;
+ 
+       // Initialize number of parts
+       // part 1 = action
+       // part 2 = posDup flag
+       // part 3 = regionName
+       // part 4 = eventId
+       // part 5 = key
+       // part 6 = value (create and update only)
+       // part 7 = whether callbackArgument is non-null
+       // part 8 = callbackArgument (if non-null)
+       // part 9 = versionTimeStamp;
+       this.numberOfParts = (this.callbackArgument == null) ? 8 : 9;
+     } else if (operation == EnumListenerEvent.AFTER_UPDATE) {
+       // Initialize after update action
+       this.action = UPDATE_ACTION;
+ 
+       // Initialize number of parts
+       this.numberOfParts = (this.callbackArgument == null) ? 8 : 9;
+     } else if (operation == EnumListenerEvent.AFTER_DESTROY) {
+       // Initialize after destroy action
+       this.action = DESTROY_ACTION;
+ 
+       // Initialize number of parts
+       // Since there is no value, there is one less part
+       this.numberOfParts = (this.callbackArgument == null) ? 7 : 8;
+     } else if (operation == EnumListenerEvent.TIMESTAMP_UPDATE) {
+       // Initialize after destroy action
+       this.action = VERSION_ACTION;
+ 
+       // Initialize number of parts
+       // Since there is no value, there is one less part
+       this.numberOfParts = (this.callbackArgument == null) ? 7 : 8;
+     } else if (operation == EnumListenerEvent.AFTER_INVALIDATE) {
+       // Initialize after invalidate action
+       this.action = INVALIDATE_ACTION;
+ 
+       // Initialize number of parts
+       // Since there is no value, there is one less part
+       this.numberOfParts = (this.callbackArgument == null) ? 7 : 8;
+     }
+   }
+   
+   private void initializeOperationDetail(Operation operation) {
+     if (operation.isLocalLoad()) {
+       operationDetail = OP_DETAIL_LOCAL_LOAD;
+     } else if (operation.isNetLoad()) {
+       operationDetail = OP_DETAIL_NET_LOAD;
+     } else if (operation.isPutAll()) {
+       operationDetail = OP_DETAIL_PUTALL;
+     } else if (operation.isRemoveAll()) {
+       operationDetail = OP_DETAIL_REMOVEALL;
+     } else {
+       operationDetail = OP_DETAIL_NONE;
+     }
+   }
+ 
+   public EventID getEventId() {
+     return this.id;
+   }
+ 
+   /**
+    * Return the EventSequenceID of the Event
+    * @return    EventSequenceID
+    */
+   public EventSequenceID getEventSequenceID() {
+     return new EventSequenceID(id.getMembershipID(), id.getThreadID(), id
+         .getSequenceID());
+   }
+   
+   public long getVersionTimeStamp() {
+     return this.versionTimeStamp;
+   }
+   
+   public int getSizeInBytes() {
+     // Calculate the size of this event. This is used for overflow to disk.
+ 
+     // The sizes of the following variables are calculated:
+     //
+     // - the value (byte[])
+     // - the original callback argument (Object)
+     // - primitive and object instance variable references
+     //
+     // The sizes of the following variables are not calculated:
+ 
+     // - the key because it is a reference
+     // - the region and regionName because they are references
+     // - the operation because it is a reference
+     // - the entry event because it is nulled prior to calling this method
+ 
+     // The size of instances of the following internal datatypes were estimated
+     // using a NullDataOutputStream and hardcoded into this method:
+ 
+     // - the id (an instance of EventId)
+     // - the callbackArgument (an instance of GatewayEventCallbackArgument)
+ 
+     int size = 0;
+ 
+     // Add this event overhead
+     size += Sizeable.PER_OBJECT_OVERHEAD;
+ 
+     // Add object references
+     // _id reference = 4 bytes
+     // _region reference = 4 bytes
+     // _regionName reference = 4 bytes
+     // _key reference = 4 bytes
+     // _callbackArgument reference = 4 bytes
+     // _operation reference = 4 bytes
+     // _entryEvent reference = 4 bytes
+     size += 28;
+ 
+     // Add primitive references
+     // int _action = 4 bytes
+     // int _numberOfParts = 4 bytes
+     // byte _valueIsObject = 1 byte
+     // boolean _possibleDuplicate = 1 byte
+     // int bucketId = 4 bytes
+     // long shadowKey = 8 bytes
+     // long creationTime = 8 bytes
+     size += 30;
+ 
+     // Add the id (an instance of EventId)
+     // The hardcoded value below was estimated using a NullDataOutputStream
+     size += Sizeable.PER_OBJECT_OVERHEAD + 56;
+ 
+     // The value (a byte[])
+     size += getSerializedValueSize();
+ 
+     // The callback argument (a GatewayEventCallbackArgument wrapping an Object
+     // which is the original callback argument)
+     // The hardcoded value below represents the GatewayEventCallbackArgument
+     // and was estimated using a NullDataOutputStream
+     size += Sizeable.PER_OBJECT_OVERHEAD + 194;
+     // The sizeOf call gets the size of the input callback argument.
+     size += Sizeable.PER_OBJECT_OVERHEAD + sizeOf(getCallbackArgument());
+ 
+     // the version timestamp
+     size += 8;
+     
+     return size;
+   }
+ 
+   private int sizeOf(Object obj) {
+     int size = 0;
+     if (obj == null) {
+       return size;
+     }
+     if (obj instanceof String) {
+       size = ObjectSizer.DEFAULT.sizeof(obj);
+     } else if (obj instanceof Integer) {
+       size = 4; // estimate
+     } else if (obj instanceof Long) {
+       size = 8; // estimate
+     } else {
+       size = CachedDeserializableFactory.calcMemSize(obj)
+           - Sizeable.PER_OBJECT_OVERHEAD;
+     }
+     return size;
+   }
+ 
+ 
+   // Asif: If the GatewayEvent serializes to a node where the region itself may
+   // not be present or the
+   // region is not created yet , and if the gateway event queue is persistent,
+   // then even if
+   // we try to set the region in the fromData , we may still get null. Though
+   // the product is
+   // not using this method anywhere still not comfortable changing the Interface
+   // so
+   // modifying the implementation a bit.
+ 
+   public Region<?, ?> getRegion() {
+     // The region will be null mostly for the other node where the gateway event
+     // is serialized
+     return this.region != null ? this.region : CacheFactory.getAnyInstance()
+         .getRegion(this.regionPath);
+   }
+ 
+   public int getBucketId() {
+     return bucketId;
+   }
+ 
+   /**
+    * @param tailKey
+    *          the tailKey to set
+    */
+   public void setShadowKey(Long tailKey) {
+     this.shadowKey = tailKey;
+   }
+ 
+   /**
+    * @return the tailKey
+    */
+   public Long getShadowKey() {
+     return this.shadowKey;
+   }
+ 
+   @Override
+   public Version[] getSerializationVersions() {
+     // TODO Auto-generated method stub
+     return null;
+   }
+ 
+   public int getSerializedValueSize() {
+     @Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
+     Object vo = this.valueObj;
+     if (vo instanceof StoredObject) {
+       return ((StoredObject) vo).getSizeInBytes();
+     } else {
+       return CachedDeserializableFactory.calcMemSize(getSerializedValue());
+     }
+   }
+   
+   @Override
+   @Released(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
+   public void release() {
+     @Released(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
+     Object vo = this.valueObj;
+     if (OffHeapHelper.releaseAndTrackOwner(vo, this)) {
+       this.valueObj = null;
+       this.valueObjReleased = true;
+     }
+   }
+   
+   public static void release(@Released(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE) Object o) {
+     if (o instanceof GatewaySenderEventImpl) {
+       ((GatewaySenderEventImpl) o).release();
+     }
+   }
+ 
+   /**
+    * Make a heap copy of this off-heap event and return it.
+    * A copy only needs to be made if the event's value is stored off-heap.
+    * If it is already on the java heap then just return "this".
+    * If it was stored off-heap and is no longer available (because it was released) then return null.
+    */
+   public GatewaySenderEventImpl makeHeapCopyIfOffHeap() {
+     if (this.value != null) {
+       // we have the value stored on the heap so return this
+       return this;
+     } else {
+       Object v = this.valueObj;
+       if (v == null) {
+         if (this.valueObjReleased) {
+           // this means that the original off heap value was freed
+           return null;
+         } else {
+           return this;
+         }
+       }
 -      if (v instanceof Chunk) {
++      if (v instanceof ObjectChunk) {
+         try {
+           return makeCopy();
+         } catch (IllegalStateException ex) {
+           // this means that the original off heap value was freed
+           return null;
+         }
+       } else {
+         // the valueObj does not use refCounts so just return this.
+         return this;
+       }
+     }
+   }
+   
+   protected GatewaySenderEventImpl makeCopy() {
+     return new GatewaySenderEventImpl(this);
+   }
+ 
+   public void copyOffHeapValue() {
+     if (this.value == null) {
+       this.value = getSerializedValue();
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/AddressableMemoryChunk.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/AddressableMemoryChunk.java
index 0000000,0000000..7916e1f
new file mode 100644
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/AddressableMemoryChunk.java
@@@ -1,0 -1,0 +1,29 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *      http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package com.gemstone.gemfire.internal.offheap;
++
++/**
++ * A memory chunk that also has an address of its memory.
++ */
++public interface AddressableMemoryChunk extends MemoryChunk {
++
++  /**
++   * Return the address of the memory of this chunk.
++   */
++  public long getMemoryAddress();
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/AddressableMemoryChunkFactory.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/AddressableMemoryChunkFactory.java
index 0000000,0000000..fa2dd78
new file mode 100644
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/AddressableMemoryChunkFactory.java
@@@ -1,0 -1,0 +1,27 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *      http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package com.gemstone.gemfire.internal.offheap;
++
++/**
++ * Used to create AddressableMemoryChunk instances.
++ */
++public interface AddressableMemoryChunkFactory {
++  /** Create and return an AddressableMemoryChunk.
++   * @throws OutOfMemoryError if the create fails
++   */
++  public AddressableMemoryChunk create(int size);
++}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/Fragment.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/Fragment.java
index 0000000,ef56627..d337cfc
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/Fragment.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/Fragment.java
@@@ -1,0 -1,139 +1,139 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.offheap;
+ 
+ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+ 
+ /**
+  * A fragment is a block of memory that can have chunks allocated from it.
+  * The allocations are always from the front so the free memory is always
+  * at the end. The freeIdx keeps track of the first byte of free memory in
+  * the fragment.
+  * The base memory address and the total size of a fragment never change.
+  * During compaction fragments go away and are recreated.
+  * 
+  * @author darrel
+  *
+  */
+ public class Fragment implements MemoryBlock {
 -  private static final byte FILL_BYTE = Chunk.FILL_BYTE;
++  private static final byte FILL_BYTE = ObjectChunk.FILL_BYTE;
+   private final long baseAddr;
+   private final int size;
+   @SuppressWarnings("unused")
+   private volatile int freeIdx;
+   private static AtomicIntegerFieldUpdater<Fragment> freeIdxUpdater = AtomicIntegerFieldUpdater.newUpdater(Fragment.class, "freeIdx");
+   
+   public Fragment(long addr, int size) {
+     SimpleMemoryAllocatorImpl.validateAddress(addr);
+     this.baseAddr = addr;
+     this.size = size;
+     freeIdxUpdater.set(this, 0);
+   }
+   
+   public int freeSpace() {
+     return getSize() - getFreeIndex();
+   }
+ 
+   public boolean allocate(int oldOffset, int newOffset) {
+     return freeIdxUpdater.compareAndSet(this, oldOffset, newOffset);
+   }
+ 
+   public int getFreeIndex() {
+     return freeIdxUpdater.get(this);
+   }
+ 
+   public int getSize() {
+     return this.size;
+   }
+ 
+   public long getMemoryAddress() {
+     return this.baseAddr;
+   }
+ 
+   @Override
+   public State getState() {
+     return State.UNUSED;
+   }
+ 
+   @Override
+   public MemoryBlock getNextBlock() {
+     throw new UnsupportedOperationException();
+   }
+   
+   @Override
+   public int getBlockSize() {
+     return freeSpace();
+   }
+   
+   @Override
+   public int getSlabId() {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public int getFreeListId() {
+     return -1;
+   }
+ 
+   @Override
+   public int getRefCount() {
+     return 0;
+   }
+ 
+   @Override
+   public String getDataType() {
+     return "N/A";
+   }
+ 
+   @Override
+   public boolean isSerialized() {
+     return false;
+   }
+ 
+   @Override
+   public boolean isCompressed() {
+     return false;
+   }
+ 
+   @Override
+   public Object getDataValue() {
+     return null;
+   }
+   
+   public void fill() {
+     UnsafeMemoryChunk.fill(this.baseAddr, this.size, FILL_BYTE);
+   }
+ 
+   @Override
 -  public ChunkType getChunkType() {
 -    return null;
 -  }
 -  
 -  @Override
+   public boolean equals(Object o) {
+     if (o instanceof Fragment) {
+       return getMemoryAddress() == ((Fragment) o).getMemoryAddress();
+     }
+     return false;
+   }
+   
+   @Override
+   public int hashCode() {
+     long value = this.getMemoryAddress();
+     return (int)(value ^ (value >>> 32));
+   }
++  @Override
++  public String toString() {
++    return "Fragment [baseAddr=" + baseAddr + ", size=" + size + ", freeIdx=" + freeIdx + "]";
++  }
++
+ }