You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2018/03/16 16:26:36 UTC

[geode] 01/02: GEODE-4769: optional early serialization of EntryEvent key and new value

This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit be9d99da1bb9beb077e9339d0742706ba5d3b96b
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Thu Mar 15 16:31:29 2018 -0700

    GEODE-4769: optional early serialization of EntryEvent key and new value
    
    If enabled, Regions and Transactions will serialize key and value before
    updating the local cache. This prevents inconsistency between distributed
    members caused by any failure to serialize the key or value.
    
    This feature is disabled by default. To enable it, specify the system
    property geode.earlyEntryEventSerialization=true.
---
 .../geode/internal/cache/AbstractRegionMap.java    |   6 +
 .../geode/internal/cache/EntryEventImpl.java       |   2 +-
 .../internal/cache/EntryEventSerialization.java    |  87 +++++++++
 .../geode/internal/cache/InternalEntryEvent.java   |   6 +
 .../geode/internal/cache/TXCommitMessage.java      |   6 +-
 .../apache/geode/internal/cache/TXEntryState.java  | 144 ++++++--------
 .../org/apache/geode/internal/cache/TXState.java   | 136 +++++++++----
 .../org/apache/geode/internal/cache/Token.java     |  31 ++-
 .../geode/internal/lang/SystemPropertyHelper.java  |   7 +-
 .../internal/InternalPdxInstance.java}             |  11 +-
 .../apache/geode/pdx/internal/PdxInstanceImpl.java |  12 +-
 ...okenSerializationConsistencyRegressionTest.java | 216 +++++++++++++++++++++
 .../cache/EntryEventSerializationTest.java         | 201 +++++++++++++++++++
 13 files changed, 710 insertions(+), 155 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 68d7134..100264a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -115,6 +115,8 @@ public abstract class AbstractRegionMap
   // the region that owns this map
   private RegionMapOwner owner;
 
+  private final EntryEventSerialization entryEventSerialization = new EntryEventSerialization();
+
   protected AbstractRegionMap(InternalRegionArguments internalRegionArgs) {
     // do nothing
   }
@@ -2200,7 +2202,11 @@ public abstract class AbstractRegionMap
       final boolean ifOld, Object expectedOldValue, // only non-null if ifOld
       boolean requireOldValue, final boolean overwriteDestroyed)
       throws CacheWriterException, TimeoutException {
+
     final LocalRegion owner = _getOwner();
+
+    entryEventSerialization.serializeNewValueIfNeeded(owner, event);
+
     boolean clearOccured = false;
     if (owner == null) {
       // "fix" for bug 32440
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index da722ce..d562d2c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -863,7 +863,7 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
   }
 
   @Unretained
-  protected Object basicGetNewValue() {
+  public Object basicGetNewValue() {
     generateNewValueFromBytesIfNeeded();
     Object result = this.newValue;
     if (!this.offHeapOk && isOffHeapReference(result)) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventSerialization.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventSerialization.java
new file mode 100644
index 0000000..85cb81d
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventSerialization.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.internal.lang.SystemPropertyHelper.EARLY_ENTRY_EVENT_SERIALIZATION;
+import static org.apache.geode.internal.lang.SystemPropertyHelper.getProductBooleanProperty;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+import org.apache.geode.pdx.internal.InternalPdxInstance;
+
+class EntryEventSerialization {
+
+  private final boolean enabled =
+      getProductBooleanProperty(EARLY_ENTRY_EVENT_SERIALIZATION).orElse(false);
+
+  void serializeNewValueIfNeeded(final InternalRegion region, final InternalEntryEvent event) {
+    if (enabled) {
+      doWork(region, event);
+    }
+  }
+
+  private void doWork(final InternalRegion region, final InternalEntryEvent event) {
+    if (region.getScope().isLocal()) {
+      return;
+    }
+    if (region instanceof HARegion) {
+      return;
+    }
+    if (region instanceof BucketRegionQueue) {
+      return;
+    }
+    if (event.getCachedSerializedNewValue() != null) {
+      return;
+    }
+
+    Object newValue = event.basicGetNewValue();
+    if (newValue == null) {
+      return;
+    }
+    if (newValue instanceof byte[]) {
+      return;
+    }
+    if (Token.isToken(newValue)) {
+      return;
+    }
+
+    event.setCachedSerializedNewValue(toBytes(newValue));
+  }
+
+  private byte[] toBytes(Object newValue) {
+    byte[] newValueBytes;
+    if (newValue instanceof InternalPdxInstance) {
+      newValueBytes = toBytes((InternalPdxInstance) newValue);
+    } else if (newValue instanceof CachedDeserializable) {
+      newValueBytes = toBytes((CachedDeserializable) newValue);
+    } else {
+      newValueBytes = EntryEventImpl.serialize(newValue);
+    }
+    return newValueBytes;
+  }
+
+  private byte[] toBytes(final InternalPdxInstance pdxInstance) {
+    try {
+      return pdxInstance.toBytes();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  private byte[] toBytes(final CachedDeserializable cachedDeserializable) {
+    return cachedDeserializable.getSerializedValue();
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalEntryEvent.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalEntryEvent.java
index 3b0e51e..af7319b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalEntryEvent.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalEntryEvent.java
@@ -22,4 +22,10 @@ import org.apache.geode.cache.EntryEvent;
 public interface InternalEntryEvent extends EntryEvent {
 
   void setRegionEntry(RegionEntry re);
+
+  Object basicGetNewValue();
+
+  void setCachedSerializedNewValue(byte[] v);
+
+  byte[] getCachedSerializedNewValue();
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index b12b979..98b1f2e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -1543,10 +1543,12 @@ public class TXCommitMessage extends PooledDistributionMessage
         if (!this.op.isDestroy()) {
           this.didDestroy = in.readBoolean();
           if (!this.op.isInvalidate()) {
-            boolean isToken = in.readBoolean();
-            if (isToken) {
+            boolean isTokenOrByteArray = in.readBoolean();
+            if (isTokenOrByteArray) {
+              // token or byte[]
               this.value = DataSerializer.readObject(in);
             } else {
+              // CachedDeserializable, Object, or PDX
               this.value = CachedDeserializableFactory.create(DataSerializer.readByteArray(in),
                   GemFireCacheImpl.getInstance());
             }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntryState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntryState.java
index 598b72f..6f2a22d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntryState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntryState.java
@@ -12,7 +12,6 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.geode.internal.cache;
 
 import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.TX_ENTRY_STATE;
@@ -27,7 +26,18 @@ import java.util.Set;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.DataSerializer;
-import org.apache.geode.cache.*;
+import org.apache.geode.cache.CacheRuntimeException;
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.cache.TimeoutException;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
@@ -49,9 +59,7 @@ import org.apache.geode.pdx.PdxSerializationException;
  * TXEntryState is the entity that tracks transactional changes, except for those tracked by
  * {@link TXEntryUserAttrState}, to an entry.
  *
- *
  * @since GemFire 4.0
- *
  */
 public class TXEntryState implements Releasable {
   private static final Logger logger = LogService.getLogger();
@@ -83,6 +91,8 @@ public class TXEntryState implements Releasable {
 
   private Object pendingValue;
 
+  private byte[] serializedPendingValue;
+
   /**
    * Remember the callback argument for listener invocation
    */
@@ -173,7 +183,7 @@ public class TXEntryState implements Releasable {
   private static final boolean DETECT_READ_CONFLICTS =
       Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "detectReadConflicts");
 
-  // @todo darrel: optimize footprint by having this field on a subclass
+  // TODO: optimize footprint by having this field on a subclass
   // that is only created by TXRegionState when it knows its region needs refCounts.
   /**
    * A reference to the RegionEntry, in committed state, that this tx entry has referenced. Note:
@@ -241,7 +251,6 @@ public class TXEntryState implements Releasable {
     this.refCountEntry = null;
   }
 
-
   private TXRegionState txRegionState = null;
 
   /**
@@ -274,6 +283,34 @@ public class TXEntryState implements Releasable {
     }
   }
 
+  private byte[] getSerializedPendingValue() {
+    return serializedPendingValue;
+  }
+
+  private void setSerializedPendingValue(byte[] serializedPendingValue) {
+    this.serializedPendingValue = serializedPendingValue;
+  }
+
+  void serializePendingValue() {
+    Object pendingValue = getPendingValue();
+    if (Token.isInvalidOrRemoved(pendingValue)) {
+      // token
+      return;
+    }
+    if (pendingValue instanceof byte[]) {
+      // byte[]
+      return;
+    }
+
+    // CachedDeserialized, Object or PDX
+    if (pendingValue instanceof CachedDeserializable) {
+      CachedDeserializable cachedDeserializable = (CachedDeserializable) pendingValue;
+      setSerializedPendingValue(cachedDeserializable.getSerializedValue());
+    } else {
+      setSerializedPendingValue(EntryEventImpl.serialize(pendingValue));
+    }
+  }
+
   public TXRegionState getTXRegionState() {
     return txRegionState;
   }
@@ -412,7 +449,6 @@ public class TXEntryState implements Releasable {
   }
 
   /**
-   * @param key
    * @return the value, or null if the value does not exist in the cache, Token.INVALID or
    *         Token.LOCAL_INVALID if the value is invalid
    */
@@ -482,33 +518,10 @@ public class TXEntryState implements Releasable {
     return isOpPutEvent() || isOpCreateEvent() || isOpInvalidateEvent() || isOpDestroyEvent(r);
   }
 
-  boolean isOpSearchOrLoad() {
-    return this.op >= OP_SEARCH_CREATE && this.op != OP_PUT && this.op != OP_LOCAL_CREATE;
-  }
-
   boolean isOpSearch() {
     return this.op == OP_SEARCH_CREATE || this.op == OP_SEARCH_PUT;
   }
 
-  boolean isOpLocalLoad() {
-    return this.op == OP_LLOAD_CREATE || this.op == OP_LLOAD_PUT;
-  }
-
-  boolean isOpNetLoad() {
-    return this.op == OP_NLOAD_CREATE || this.op == OP_NLOAD_PUT;
-  }
-
-  boolean isOpLoad() {
-    return isOpLocalLoad() || isOpNetLoad();
-  }
-
-  // private boolean isLocalEventDistributed()
-  // {
-  // return this.op == OP_D_DESTROY
-  // || (this.op >= OP_D_INVALIDATE && this.op != OP_SEARCH_CREATE
-  // && this.op != OP_LOCAL_CREATE && this.op != OP_SEARCH_PUT);
-  // }
-
   String opToString() {
     return opToString(this.op);
   }
@@ -879,15 +892,8 @@ public class TXEntryState implements Releasable {
     }
   }
 
-  // private void dumpOp() {
-  // System.out.println("DEBUG: op=" + opToString()
-  // + " destroy=" + this.destroy
-  // + " isDis=" + isLocalEventDistributed());
-  // System.out.flush();
-  // }
   @Retained
   EntryEvent getEvent(LocalRegion r, Object key, TXState txs) {
-    // dumpOp();
     LocalRegion eventRegion = r;
     if (r.isUsedForPartitionedRegionBucket()) {
       eventRegion = r.getPartitionedRegion();
@@ -917,11 +923,6 @@ public class TXEntryState implements Releasable {
    * @return true if invalidate was done
    */
   public boolean invalidate(EntryEventImpl event) throws EntryNotFoundException {
-    // LocalRegion lr = event.getRegion();
-    // boolean isProxy = lr.isProxy();
-    // if (!isLocallyValid(isProxy)) {
-    // return false;
-    // }
     if (event.isLocalInvalid()) {
       performOp(adviseOp(OP_L_INVALIDATE, event), event);
     } else {
@@ -1026,8 +1027,6 @@ public class TXEntryState implements Releasable {
    * We will try to establish TXState on members with dataPolicy REPLICATE, this is done for the
    * first region to be involved in a transaction. For subsequent region if the dataPolicy is not
    * REPLICATE, we fetch the VersionTag from replicate members.
-   *
-   * @param event
    */
   private void fetchRemoteVersionTag(EntryEventImpl event) {
     if (event.getRegion() instanceof DistributedRegion) {
@@ -1803,6 +1802,7 @@ public class TXEntryState implements Releasable {
     }
   }
 
+
   /**
    * @return returns {@link Operation#PUTALL_CREATE} if the operation is a result of bulk op,
    *         {@link Operation#CREATE} otherwise
@@ -1842,6 +1842,9 @@ public class TXEntryState implements Releasable {
    * Serializes this entry state to a data output stream for a far side consumer. Make sure this
    * method is backwards compatible if changes are made.
    *
+   * <p>
+   * The fromData for this is TXCommitMessage$RegionCommit$FarSideEntryOp#fromData.
+   *
    * @param largeModCount true if modCount needs to be represented by an int; false if a byte is
    *        enough
    * @param sendVersionTag true if versionTag should be sent to clients 7.0 and above
@@ -1874,13 +1877,15 @@ public class TXEntryState implements Releasable {
     if (!operation.isDestroy()) {
       out.writeBoolean(didDistributedDestroy());
       if (!operation.isInvalidate()) {
-        boolean sendObject = Token.isInvalidOrRemoved(getPendingValue());
-        sendObject = sendObject || getPendingValue() instanceof byte[];
-        out.writeBoolean(sendObject);
-        if (sendObject) {
+        boolean isTokenOrByteArray = Token.isInvalidOrRemoved(getPendingValue());
+        isTokenOrByteArray = isTokenOrByteArray || getPendingValue() instanceof byte[];
+        out.writeBoolean(isTokenOrByteArray);
+        if (isTokenOrByteArray) {
+          // this is a token or byte[] only
           DataSerializer.writeObject(getPendingValue(), out);
         } else {
-          DataSerializer.writeObjectAsByteArray(getPendingValue(), out);
+          // this is a CachedDeserializable, Object and PDX
+          DataSerializer.writeByteArray(getSerializedPendingValue(), out);
         }
       }
     }
@@ -1890,28 +1895,6 @@ public class TXEntryState implements Releasable {
     return filterRoutingInfo;
   }
 
-  /**
-   * Creates a queued op and returns it for this entry on the far side of the tx.
-   *
-   * @param key the key for this op
-   * @since GemFire 5.0
-   */
-  QueuedOperation toFarSideQueuedOp(Object key) {
-    Operation operation = getFarSideOperation();
-    byte[] valueBytes = null;
-    byte deserializationPolicy = DistributedCacheOperation.DESERIALIZATION_POLICY_NONE;
-    if (!operation.isDestroy() && !operation.isInvalidate()) {
-      Object v = getPendingValue();
-      if (v == null || v instanceof byte[]) {
-        valueBytes = (byte[]) v;
-      } else {
-        deserializationPolicy = DistributedCacheOperation.DESERIALIZATION_POLICY_LAZY;
-        valueBytes = EntryEventImpl.serialize(v);
-      }
-    }
-    return new QueuedOperation(operation, key, valueBytes, null, deserializationPolicy, null);
-  }
-
   void cleanup(LocalRegion r) {
     if (this.refCountEntry != null) {
       r.txDecRefCount(refCountEntry);
@@ -1937,8 +1920,6 @@ public class TXEntryState implements Releasable {
      */
     @Retained
     TxEntryEventImpl(LocalRegion r, Object key) {
-      // TODO:ASIF :Check if the eventID should be created. Currently not
-      // creating it
       super(r, getNearSideOperation(), key, getNearSidePendingValue(),
           TXEntryState.this.getCallbackArgument(), false, r.getMyId(), true/* generateCallbacks */,
           true /* initializeId */);
@@ -1969,7 +1950,6 @@ public class TXEntryState implements Releasable {
     }
   }
 
-
   private static final TXEntryStateFactory factory = new TXEntryStateFactory() {
 
     public TXEntryState createEntry() {
@@ -2027,14 +2007,6 @@ public class TXEntryState implements Releasable {
     release();
   }
 
-  public void setNextRegionVersion(long v) {
-    this.nextRegionVersion = v;
-  }
-
-  public long getNextRegionVersion() {
-    return this.nextRegionVersion;
-  }
-
   @Override
   public String toString() {
     StringBuilder str = new StringBuilder();
@@ -2054,13 +2026,12 @@ public class TXEntryState implements Releasable {
 
   /**
    * For Distributed Transaction Usage
-   *
+   * <p>
    * This class is used to bring relevant information for DistTxEntryEvent from primary, after end
    * of precommit. Same information are sent to all replicates during commit.
-   *
-   * Whereas @see DistTxEntryEvent is used forstoring entry event information on TxCordinator and
+   * <p>
+   * Whereas see DistTxEntryEvent is used for storing entry event information on TxCoordinator and
    * carry same to replicates.
-   *
    */
   public static class DistTxThinEntryState implements DataSerializableFixedID {
 
@@ -2073,7 +2044,6 @@ public class TXEntryState implements Releasable {
 
     @Override
     public Version[] getSerializationVersions() {
-      // TODO Auto-generated method stub
       return null;
     }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index cf35e03..61520d9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -193,6 +193,7 @@ public class TXState implements TXStateInterface {
    *
    * @see org.apache.geode.internal.cache.TXStateInterface#getTransactionId()
    */
+  @Override
   public TransactionId getTransactionId() {
     return this.proxy.getTxId();
   }
@@ -229,10 +230,12 @@ public class TXState implements TXStateInterface {
    * org.apache.geode.internal.cache.TXStateInterface#readRegion(org.apache.geode.internal.cache.
    * LocalRegion)
    */
+  @Override
   public TXRegionState readRegion(LocalRegion r) {
     return this.regions.get(r);
   }
 
+  @Override
   public void rmRegion(LocalRegion r) {
     TXRegionState txr = this.regions.remove(r);
     if (txr != null) {
@@ -247,6 +250,7 @@ public class TXState implements TXStateInterface {
    * org.apache.geode.internal.cache.TXStateInterface#writeRegion(org.apache.geode.internal.cache.
    * LocalRegion)
    */
+  @Override
   public TXRegionState writeRegion(LocalRegion r) {
     TXRegionState result = readRegion(r);
     if (result == null) {
@@ -269,6 +273,7 @@ public class TXState implements TXStateInterface {
    *
    * @see org.apache.geode.internal.cache.TXStateInterface#getBeginTime()
    */
+  @Override
   public long getBeginTime() {
     return this.beginTime;
   }
@@ -278,6 +283,7 @@ public class TXState implements TXStateInterface {
    *
    * @see org.apache.geode.internal.cache.TXStateInterface#getChanges()
    */
+  @Override
   public int getChanges() {
     int changes = 0;
     Iterator<TXRegionState> it = this.regions.values().iterator();
@@ -293,6 +299,7 @@ public class TXState implements TXStateInterface {
    *
    * @see org.apache.geode.internal.cache.TXStateInterface#isInProgress()
    */
+  @Override
   public boolean isInProgress() {
     return !this.closed;
   }
@@ -302,6 +309,7 @@ public class TXState implements TXStateInterface {
    *
    * @see org.apache.geode.internal.cache.TXStateInterface#nextModSerialNum()
    */
+  @Override
   public int nextModSerialNum() {
     this.modSerialNum += 1;
     return this.modSerialNum;
@@ -312,6 +320,7 @@ public class TXState implements TXStateInterface {
    *
    * @see org.apache.geode.internal.cache.TXStateInterface#needsLargeModCount()
    */
+  @Override
   public boolean needsLargeModCount() {
     return this.modSerialNum > Byte.MAX_VALUE;
   }
@@ -360,6 +369,7 @@ public class TXState implements TXStateInterface {
    *
    * @see org.apache.geode.internal.cache.TXStateInterface#commit()
    */
+  @Override
   public void commit() throws CommitConflictException {
     if (this.closed) {
       return;
@@ -564,6 +574,7 @@ public class TXState implements TXStateInterface {
    *
    * @see org.apache.geode.internal.cache.TXStateInterface#rollback()
    */
+  @Override
   public void rollback() {
     if (this.closed) {
       return;
@@ -775,46 +786,47 @@ public class TXState implements TXStateInterface {
    * applies this transaction to the cache.
    */
   protected void applyChanges(List/* <TXEntryStateWithRegionAndKey> */ entries) {
-    {
-      Iterator<Map.Entry<LocalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
-      while (it.hasNext()) {
-        Map.Entry<LocalRegion, TXRegionState> me = it.next();
-        LocalRegion r = me.getKey();
-        TXRegionState txrs = me.getValue();
-        txrs.applyChangesStart(r, this);
-      }
+    // applyChangesStart for each region
+    for (Map.Entry<LocalRegion, TXRegionState> me : this.regions.entrySet()) {
+      LocalRegion r = me.getKey();
+      TXRegionState txrs = me.getValue();
+      txrs.applyChangesStart(r, this);
     }
-    {
-      Iterator/* <TXEntryStateWithRegionAndKey> */ it = entries.iterator();
-      while (it.hasNext()) {
-        TXEntryStateWithRegionAndKey o = (TXEntryStateWithRegionAndKey) it.next();
-        if (this.internalDuringApplyChanges != null) {
-          this.internalDuringApplyChanges.run();
-        }
-        try {
-          o.es.applyChanges(o.r, o.key, this);
-        } catch (RegionDestroyedException ex) {
-          // region was destroyed out from under us; after conflict checking
-          // passed. So act as if the region destroy happened right after the
-          // commit. We act this way by doing nothing; including distribution
-          // of this region's commit data.
-        } catch (CancelException ex) {
-          // cache was closed out from under us; after conflict checking
-          // passed. So do nothing.
-        }
-      }
+
+    // serializePendingValue for each entry
+    for (Object entry : entries) {
+      TXEntryStateWithRegionAndKey o = (TXEntryStateWithRegionAndKey) entry;
+      o.es.serializePendingValue();
     }
-    {
-      Iterator<Map.Entry<LocalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
-      while (it.hasNext()) {
-        Map.Entry<LocalRegion, TXRegionState> me = it.next();
-        LocalRegion r = me.getKey();
-        TXRegionState txrs = me.getValue();
-        txrs.applyChangesEnd(r, this);
+
+    // applyChanges for each entry
+    for (Object entry : entries) {
+      TXEntryStateWithRegionAndKey o = (TXEntryStateWithRegionAndKey) entry;
+      if (this.internalDuringApplyChanges != null) {
+        this.internalDuringApplyChanges.run();
+      }
+      try {
+        o.es.applyChanges(o.r, o.key, this);
+      } catch (RegionDestroyedException ex) {
+        // region was destroyed out from under us; after conflict checking
+        // passed. So act as if the region destroy happened right after the
+        // commit. We act this way by doing nothing; including distribution
+        // of this region's commit data.
+      } catch (CancelException ex) {
+        // cache was closed out from under us; after conflict checking
+        // passed. So do nothing.
       }
     }
+
+    // applyChangesEnd for each region
+    for (Map.Entry<LocalRegion, TXRegionState> me : this.regions.entrySet()) {
+      LocalRegion r = me.getKey();
+      TXRegionState txrs = me.getValue();
+      txrs.applyChangesEnd(r, this);
+    }
   }
 
+  @Override
   public TXEvent getEvent() {
     return new TXEvent(this, getCache());
   }
@@ -900,6 +912,7 @@ public class TXState implements TXStateInterface {
    *
    * @see org.apache.geode.internal.cache.TXStateInterface#getEvents()
    */
+  @Override
   public List getEvents() {
     ArrayList events = new ArrayList();
     Iterator<Map.Entry<LocalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
@@ -955,6 +968,7 @@ public class TXState implements TXStateInterface {
       return this.es.getSortValue();
     }
 
+    @Override
     public int compareTo(Object o) {
       TXEntryStateWithRegionAndKey other = (TXEntryStateWithRegionAndKey) o;
       return getSortValue() - other.getSortValue();
@@ -981,6 +995,7 @@ public class TXState implements TXStateInterface {
    *
    * @see org.apache.geode.internal.cache.TXStateInterface#beforeCompletion()
    */
+  @Override
   public void beforeCompletion() throws SynchronizationCommitConflictException {
     if (this.closed) {
       throw new TXManagerCancelledException();
@@ -1040,6 +1055,7 @@ public class TXState implements TXStateInterface {
    *
    * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
    */
+  @Override
   public void afterCompletion(int status) {
     // System.err.println("start afterCompletion");
     final long opStart = CachePerfStats.getStatTime();
@@ -1180,6 +1196,7 @@ public class TXState implements TXStateInterface {
    *
    * @see org.apache.geode.internal.cache.TXStateInterface#getCache()
    */
+  @Override
   public InternalCache getCache() {
     return this.proxy.getCache();
   }
@@ -1189,15 +1206,18 @@ public class TXState implements TXStateInterface {
    *
    * @see org.apache.geode.internal.cache.TXStateInterface#getRegions()
    */
+  @Override
   public Collection<LocalRegion> getRegions() {
     return this.regions.keySet();
   }
 
+  @Override
   public TXRegionState txWriteRegion(final LocalRegion localRegion, final KeyInfo entryKey) {
     LocalRegion lr = localRegion.getDataRegionForWrite(entryKey);
     return writeRegion(lr);
   }
 
+  @Override
   public TXRegionState txReadRegion(LocalRegion localRegion) {
     return readRegion(localRegion);
   }
@@ -1244,6 +1264,7 @@ public class TXState implements TXStateInterface {
    * this version of txPutEntry takes a ConcurrentMap expectedOldValue parameter. If not null, this
    * value must match the current value of the entry or false is returned
    */
+  @Override
   public boolean txPutEntry(final EntryEventImpl event, boolean ifNew, boolean requireOldValue,
       boolean checkResources, Object expectedOldValue) {
 
@@ -1290,10 +1311,11 @@ public class TXState implements TXStateInterface {
    * @see org.apache.geode.internal.cache.TXStateInterface#containsValueForKey(java.lang.Object,
    * org.apache.geode.internal.cache.LocalRegion)
    */
+  @Override
   public boolean containsValueForKey(KeyInfo keyInfo, LocalRegion region) {
     TXEntryState tx = txReadEntry(keyInfo, region, true, true/* create txEntry is absent */);
     if (tx != null) {
-      /**
+      /*
        * Note that we don't consult this.getDataPolicy().isProxy() when setting this because in this
        * context we don't want proxies to pretend they have a value.
        */
@@ -1311,6 +1333,7 @@ public class TXState implements TXStateInterface {
    * org.apache.geode.internal.cache.TXStateInterface#destroyExistingEntry(org.apache.geode.internal
    * .cache.EntryEventImpl, boolean, java.lang.Object)
    */
+  @Override
   public void destroyExistingEntry(final EntryEventImpl event, final boolean cacheWrite,
       Object expectedOldValue) {
     if (bridgeContext == null) {
@@ -1335,6 +1358,7 @@ public class TXState implements TXStateInterface {
    * @see org.apache.geode.internal.cache.TXStateInterface#invalidateExistingEntry(org.apache.geode.
    * internal.cache.EntryEventImpl, boolean, boolean)
    */
+  @Override
   public void invalidateExistingEntry(final EntryEventImpl event, boolean invokeCallbacks,
       boolean forceNewEntry) {
     if (bridgeContext == null) {
@@ -1353,10 +1377,7 @@ public class TXState implements TXStateInterface {
    * Write an existing entry. This form takes an expectedOldValue which, if not null, must be equal
    * to the current value of the entry. If it is not, an EntryNotFoundException is thrown.
    *
-   * @param event
-   * @param expectedOldValue
    * @return the tx entry object
-   * @throws EntryNotFoundException
    */
   private TXEntryState txWriteExistingEntry(final EntryEventImpl event, Object expectedOldValue)
       throws EntryNotFoundException {
@@ -1393,6 +1414,7 @@ public class TXState implements TXStateInterface {
    * @see org.apache.geode.internal.cache.TXStateInterface#getEntry(java.lang.Object,
    * org.apache.geode.internal.cache.LocalRegion)
    */
+  @Override
   public Entry getEntry(final KeyInfo keyInfo, final LocalRegion region, boolean allowTombstones) {
     TXEntryState tx = txReadEntry(keyInfo, region, true, true/* create txEntry is absent */);
     if (tx != null && tx.existsLocally()) {
@@ -1402,6 +1424,7 @@ public class TXState implements TXStateInterface {
     }
   }
 
+  @Override
   public Entry accessEntry(KeyInfo keyInfo, LocalRegion localRegion) {
     return getEntry(keyInfo, localRegion, false);
   }
@@ -1411,14 +1434,13 @@ public class TXState implements TXStateInterface {
   }
 
   /**
-   * @param keyInfo
-   * @param localRegion
    * @param rememberRead true if the value read from committed state needs to be remembered in tx
    *        state for repeatable read.
    * @param createIfAbsent should a transactional entry be created if not present.
    * @return a txEntryState or null if the entry doesn't exist in the transaction and/or committed
    *         state.
    */
+  @Override
   public TXEntryState txReadEntry(KeyInfo keyInfo, LocalRegion localRegion, boolean rememberRead,
       boolean createIfAbsent) {
     localRegion.cache.getCancelCriterion().checkCancelInProgress(null);
@@ -1490,6 +1512,7 @@ public class TXState implements TXStateInterface {
    * @see org.apache.geode.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object,
    * org.apache.geode.internal.cache.LocalRegion, boolean)
    */
+  @Override
   public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats,
       boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent,
       boolean returnTombstones, boolean retainResult) {
@@ -1513,6 +1536,7 @@ public class TXState implements TXStateInterface {
    * org.apache.geode.internal.cache.InternalDataView#getSerializedValue(org.apache.geode.internal.
    * cache.LocalRegion, java.lang.Object, java.lang.Object)
    */
+  @Override
   @Retained
   public Object getSerializedValue(LocalRegion localRegion, KeyInfo keyInfo, boolean doNotLockEntry,
       ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
@@ -1543,6 +1567,7 @@ public class TXState implements TXStateInterface {
    * org.apache.geode.internal.cache.InternalDataView#entryCount(org.apache.geode.internal.cache.
    * LocalRegion)
    */
+  @Override
   public int entryCount(LocalRegion localRegion) {
     int result = localRegion.getRegionSize();
     TXRegionState txr = txReadRegion(localRegion);
@@ -1565,6 +1590,7 @@ public class TXState implements TXStateInterface {
    * @see org.apache.geode.internal.cache.TXStateInterface#containsKey(java.lang.Object,
    * org.apache.geode.internal.cache.LocalRegion)
    */
+  @Override
   public boolean containsKey(KeyInfo keyInfo, LocalRegion localRegion) {
     TXEntryState tx = txReadEntry(keyInfo, localRegion, true, true/* create txEntry is absent */);
     if (tx != null) {
@@ -1580,6 +1606,7 @@ public class TXState implements TXStateInterface {
    * @see org.apache.geode.internal.cache.TXStateInterface#getValueInVM(java.lang.Object,
    * org.apache.geode.internal.cache.LocalRegion, boolean)
    */
+  @Override
   @Retained
   public Object getValueInVM(KeyInfo keyInfo, LocalRegion localRegion, boolean rememberRead) {
     TXEntryState tx =
@@ -1596,6 +1623,7 @@ public class TXState implements TXStateInterface {
    * @see org.apache.geode.internal.cache.TXStateInterface#putEntry(org.apache.geode.internal.cache.
    * EntryEventImpl, boolean, boolean, java.lang.Object, boolean, long, boolean)
    */
+  @Override
   public boolean putEntry(EntryEventImpl event, boolean ifNew, boolean ifOld,
       Object expectedOldValue, boolean requireOldValue, long lastModified,
       boolean overwriteDestroyed) {
@@ -1605,8 +1633,6 @@ public class TXState implements TXStateInterface {
 
   /**
    * throws an exception when cloning is disabled while using delta
-   *
-   * @param event
    */
   private void validateDelta(EntryEventImpl event) {
     if (event.getDeltaBytes() != null && !event.getRegion().getAttributes().getCloningEnabled()) {
@@ -1620,6 +1646,7 @@ public class TXState implements TXStateInterface {
    *
    * @see org.apache.geode.internal.cache.InternalDataView#isStatsDeferred()
    */
+  @Override
   public boolean isDeferredStats() {
     return true;
   }
@@ -1631,6 +1658,7 @@ public class TXState implements TXStateInterface {
    * org.apache.geode.internal.cache.TXStateInterface#findObject(org.apache.geode.internal.cache.
    * LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object)
    */
+  @Override
   public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate, boolean generateCallbacks,
       Object value, boolean disableCopyOnRead, boolean preferCD,
       ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
@@ -1660,6 +1688,7 @@ public class TXState implements TXStateInterface {
    * org.apache.geode.internal.cache.TXStateInterface#getEntryForIterator(org.apache.geode.internal.
    * cache.LocalRegion, java.lang.Object, boolean)
    */
+  @Override
   public Object getEntryForIterator(KeyInfo curr, LocalRegion currRgn, boolean rememberReads,
       boolean allowTombstones) {
     if (currRgn instanceof PartitionedRegion) {
@@ -1691,6 +1720,7 @@ public class TXState implements TXStateInterface {
    * @see org.apache.geode.internal.cache.InternalDataView#getKeyForIterator(java.lang.Object,
    * org.apache.geode.internal.cache.LocalRegion, boolean)
    */
+  @Override
   public Object getKeyForIterator(KeyInfo curr, LocalRegion currRgn, boolean rememberReads,
       boolean allowTombstones) {
     assert !(curr.getKey() instanceof RegionEntry);
@@ -1708,6 +1738,7 @@ public class TXState implements TXStateInterface {
    * org.apache.geode.internal.cache.InternalDataView#getAdditionalKeysForIterator(org.apache.geode.
    * internal.cache.LocalRegion)
    */
+  @Override
   public Set getAdditionalKeysForIterator(LocalRegion currRgn) {
     if (currRgn instanceof PartitionedRegion) {
       final HashSet ret = new HashSet();
@@ -1739,6 +1770,7 @@ public class TXState implements TXStateInterface {
    * @see org.apache.geode.internal.cache.TXStateInterface#isInProgressAndSameAs(org.apache.geode.
    * internal.cache.TXStateInterface)
    */
+  @Override
   public boolean isInProgressAndSameAs(TXStateInterface otherState) {
     return isInProgress() && otherState == this;
   }
@@ -1751,6 +1783,7 @@ public class TXState implements TXStateInterface {
    * org.apache.geode.internal.cache.InternalDataView#putEntryOnRemote(org.apache.geode.internal.
    * cache.EntryEventImpl, boolean, boolean, java.lang.Object, boolean, long, boolean)
    */
+  @Override
   public boolean putEntryOnRemote(EntryEventImpl event, boolean ifNew, boolean ifOld,
       Object expectedOldValue, boolean requireOldValue, long lastModified,
       boolean overwriteDestroyed) throws DataLocationException {
@@ -1763,6 +1796,7 @@ public class TXState implements TXStateInterface {
   }
 
 
+  @Override
   public boolean isFireCallbacks() {
     return !getEvent().hasOnlyInternalEvents();
   }
@@ -1771,23 +1805,27 @@ public class TXState implements TXStateInterface {
     return onBehalfOfRemoteStub || this.proxy.isOnBehalfOfClient();
   }
 
+  @Override
   public void destroyOnRemote(EntryEventImpl event, boolean cacheWrite, Object expectedOldValue)
       throws DataLocationException {
     event.setOriginRemote(true);
     destroyExistingEntry(event, cacheWrite, expectedOldValue);
   }
 
+  @Override
   public void invalidateOnRemote(EntryEventImpl event, boolean invokeCallbacks,
       boolean forceNewEntry) throws DataLocationException {
     event.setOriginRemote(true);
     invalidateExistingEntry(event, invokeCallbacks, forceNewEntry);
   }
 
+  @Override
   public void checkSupportsRegionDestroy() throws UnsupportedOperationInTransactionException {
     throw new UnsupportedOperationInTransactionException(
         LocalizedStrings.TXState_REGION_DESTROY_NOT_SUPPORTED_IN_A_TRANSACTION.toLocalizedString());
   }
 
+  @Override
   public void checkSupportsRegionInvalidate() throws UnsupportedOperationInTransactionException {
     throw new UnsupportedOperationInTransactionException(
         LocalizedStrings.TXState_REGION_INVALIDATE_NOT_SUPPORTED_IN_A_TRANSACTION
@@ -1807,6 +1845,7 @@ public class TXState implements TXStateInterface {
    * org.apache.geode.internal.cache.InternalDataView#getBucketKeys(org.apache.geode.internal.cache.
    * LocalRegion, int)
    */
+  @Override
   public Set getBucketKeys(LocalRegion localRegion, int bucketId, boolean allowTombstones) {
     PartitionedRegion pr = (PartitionedRegion) localRegion;
     return pr.getBucketKeys(bucketId, allowTombstones);
@@ -1818,6 +1857,7 @@ public class TXState implements TXStateInterface {
    * @see org.apache.geode.internal.cache.InternalDataView#getEntryOnRemote(java.lang.Object,
    * org.apache.geode.internal.cache.LocalRegion)
    */
+  @Override
   public Entry getEntryOnRemote(KeyInfo key, LocalRegion localRegion, boolean allowTombstones)
       throws DataLocationException {
     PartitionedRegion pr = (PartitionedRegion) localRegion;
@@ -1837,6 +1877,7 @@ public class TXState implements TXStateInterface {
    *
    * @see org.apache.geode.internal.cache.TXStateInterface#getSemaphore()
    */
+  @Override
   public ReentrantLock getLock() {
     return proxy.getLock();
   }
@@ -1848,15 +1889,18 @@ public class TXState implements TXStateInterface {
    * org.apache.geode.internal.cache.InternalDataView#getRegionKeysForIteration(org.apache.geode.
    * internal.cache.LocalRegion)
    */
+  @Override
   public Set getRegionKeysForIteration(LocalRegion currRegion) {
     return currRegion.getRegionKeysForIteration();
   }
 
 
+  @Override
   public boolean isRealDealLocal() {
     return true;
   }
 
+  @Override
   public InternalDistributedMember getOriginatingMember() {
     /*
      * State will never fwd on to other nodes so this is not relevant
@@ -1864,6 +1908,7 @@ public class TXState implements TXStateInterface {
     return null;
   }
 
+  @Override
   public boolean isMemberIdForwardingRequired() {
     /*
      * State will never fwd on to other nodes so this is not relevant
@@ -1871,6 +1916,7 @@ public class TXState implements TXStateInterface {
     return false;
   }
 
+  @Override
   public TXCommitMessage getCommitMessage() {
     return commitMessage;
   }
@@ -1883,6 +1929,7 @@ public class TXState implements TXStateInterface {
    * org.apache.geode.internal.cache.InternalDataView#postPutAll(org.apache.geode.internal.cache.
    * DistributedPutAllOperation, java.util.Map, org.apache.geode.internal.cache.LocalRegion)
    */
+  @Override
   public void postPutAll(final DistributedPutAllOperation putallOp,
       final VersionedObjectList successfulPuts, LocalRegion reg) {
 
@@ -1902,6 +1949,7 @@ public class TXState implements TXStateInterface {
      * We need to put this into the tx state.
      */
     theRegion.syncBulkOp(new Runnable() {
+      @Override
       public void run() {
         // final boolean requiresRegionContext = theRegion.keyRequiresRegionContext();
         InternalDistributedMember myId =
@@ -1939,6 +1987,7 @@ public class TXState implements TXStateInterface {
      * will push them out. We need to put this into the tx state.
      */
     theRegion.syncBulkOp(new Runnable() {
+      @Override
       public void run() {
         InternalDistributedMember myId =
             theRegion.getDistributionManager().getDistributionManagerId();
@@ -1961,14 +2010,17 @@ public class TXState implements TXStateInterface {
 
   }
 
+  @Override
   public void suspend() {
     // no special tasks to perform
   }
 
+  @Override
   public void resume() {
     // no special tasks to perform
   }
 
+  @Override
   public void recordTXOperation(ServerRegionDataAccess region, ServerRegionOperation op, Object key,
       Object arguments[]) {
     // no-op here
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/Token.java b/geode-core/src/main/java/org/apache/geode/internal/cache/Token.java
index 0d29451..9fa5759 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/Token.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Token.java
@@ -12,7 +12,6 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.geode.internal.cache;
 
 import java.io.DataInput;
@@ -29,9 +28,9 @@ import org.apache.geode.internal.Version;
 /**
  * Internal tokens used as region values. These tokens are never seen from the public API.
  *
+ * <p>
  * These classes are Serializable and implement readResolve to support canonicalization in the face
  * of copysharing.
- *
  */
 public abstract class Token {
 
@@ -108,6 +107,10 @@ public abstract class Token {
     return o == DESTROYED;
   }
 
+  public static boolean isToken(Object o) {
+    return o instanceof Token;
+  }
+
   /**
    * Singleton token indicating an Invalid Entry.
    */
@@ -125,12 +128,15 @@ public abstract class Token {
       return INVALID;
     }
 
+    @Override
     public int getDSFID() {
       return TOKEN_INVALID;
     }
 
+    @Override
     public void toData(DataOutput out) throws IOException {}
 
+    @Override
     public void fromData(DataInput in) throws IOException, ClassNotFoundException {}
 
     public boolean isSerializedValue(byte[] value) {
@@ -141,7 +147,6 @@ public abstract class Token {
 
     @Override
     public Version[] getSerializationVersions() {
-      // TODO Auto-generated method stub
       return null;
     }
   }
@@ -180,17 +185,19 @@ public abstract class Token {
       return DESTROYED;
     }
 
+    @Override
     public int getDSFID() {
       return TOKEN_DESTROYED;
     }
 
+    @Override
     public void toData(DataOutput out) throws IOException {}
 
+    @Override
     public void fromData(DataInput in) throws IOException, ClassNotFoundException {}
 
     @Override
     public Version[] getSerializationVersions() {
-      // TODO Auto-generated method stub
       return null;
     }
   }
@@ -209,17 +216,19 @@ public abstract class Token {
       return TOMBSTONE;
     }
 
+    @Override
     public int getDSFID() {
       return TOKEN_TOMBSTONE;
     }
 
+    @Override
     public void toData(DataOutput out) throws IOException {}
 
+    @Override
     public void fromData(DataInput in) throws IOException, ClassNotFoundException {}
 
     @Override
     public Version[] getSerializationVersions() {
-      // TODO Auto-generated method stub
       return null;
     }
   }
@@ -238,17 +247,19 @@ public abstract class Token {
       return REMOVED_PHASE1;
     }
 
+    @Override
     public int getDSFID() {
       return TOKEN_REMOVED;
     }
 
+    @Override
     public void toData(DataOutput out) throws IOException {}
 
+    @Override
     public void fromData(DataInput in) throws IOException, ClassNotFoundException {}
 
     @Override
     public Version[] getSerializationVersions() {
-      // TODO Auto-generated method stub
       return null;
     }
   }
@@ -267,17 +278,19 @@ public abstract class Token {
       return REMOVED_PHASE2;
     }
 
+    @Override
     public int getDSFID() {
       return TOKEN_REMOVED2;
     }
 
+    @Override
     public void toData(DataOutput out) throws IOException {}
 
+    @Override
     public void fromData(DataInput in) throws IOException, ClassNotFoundException {}
 
     @Override
     public Version[] getSerializationVersions() {
-      // TODO Auto-generated method stub
       return null;
     }
   }
@@ -315,17 +328,19 @@ public abstract class Token {
       return END_OF_STREAM;
     }
 
+    @Override
     public int getDSFID() {
       return END_OF_STREAM_TOKEN;
     }
 
+    @Override
     public void fromData(DataInput in) throws IOException, ClassNotFoundException {}
 
+    @Override
     public void toData(DataOutput out) throws IOException {}
 
     @Override
     public Version[] getSerializationVersions() {
-      // TODO Auto-generated method stub
       return null;
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java b/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
index b26ff08..5208044 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
@@ -22,11 +22,10 @@ import java.util.Optional;
  *
  * @since Geode 1.4.0
  */
-
 public class SystemPropertyHelper {
-  private static final String GEODE_PREFIX = "geode.";
-  private static final String GEMFIRE_PREFIX = "gemfire.";
 
+  public static final String GEODE_PREFIX = "geode.";
+  public static final String GEMFIRE_PREFIX = "gemfire.";
 
   /**
    * When set to "true" enables asynchronous eviction algorithm (defaults to true). For more details
@@ -58,6 +57,8 @@ public class SystemPropertyHelper {
 
   public static final String EVICTION_SEARCH_MAX_ENTRIES = "lru.maxSearchEntries";
 
+  public static final String EARLY_ENTRY_EVENT_SERIALIZATION = "earlyEntryEventSerialization";
+
   /**
    * This method will try to look up "geode." and "gemfire." versions of the system property. It
    * will check and prefer "geode." setting first, then try to check "gemfire." setting.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalEntryEvent.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/InternalPdxInstance.java
similarity index 76%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/InternalEntryEvent.java
copy to geode-core/src/main/java/org/apache/geode/pdx/internal/InternalPdxInstance.java
index 3b0e51e..f2d895b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalEntryEvent.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/InternalPdxInstance.java
@@ -12,14 +12,9 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.internal.cache;
+package org.apache.geode.pdx.internal;
 
-import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.pdx.PdxInstance;
 
-/**
- * All of the API methods not exposed to User in EntryEvent.
- */
-public interface InternalEntryEvent extends EntryEvent {
-
-  void setRegionEntry(RegionEntry re);
+public interface InternalPdxInstance extends PdxInstance, ConvertableToBytes {
 }
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxInstanceImpl.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxInstanceImpl.java
index d429601..f55df95 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxInstanceImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxInstanceImpl.java
@@ -42,7 +42,6 @@ import org.apache.geode.internal.tcp.ByteBufferInputStream;
 import org.apache.geode.internal.tcp.ByteBufferInputStream.ByteSource;
 import org.apache.geode.internal.tcp.ByteBufferInputStream.ByteSourceFactory;
 import org.apache.geode.pdx.JSONFormatter;
-import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.pdx.PdxSerializationException;
 import org.apache.geode.pdx.WritablePdxInstance;
 
@@ -55,8 +54,7 @@ import org.apache.geode.pdx.WritablePdxInstance;
  * We do not use this normal java io serialization when serializing this class in GemFire because
  * Sendable takes precedence over Serializable.
  */
-public class PdxInstanceImpl extends PdxReaderImpl
-    implements PdxInstance, Sendable, ConvertableToBytes {
+public class PdxInstanceImpl extends PdxReaderImpl implements InternalPdxInstance, Sendable {
 
   private static final long serialVersionUID = -1669268527103938431L;
 
@@ -135,6 +133,7 @@ public class PdxInstanceImpl extends PdxReaderImpl
     }
   }
 
+  @Override
   public Object getField(String fieldName) {
     return getUnmodifiableReader(fieldName).readField(fieldName);
   }
@@ -155,7 +154,7 @@ public class PdxInstanceImpl extends PdxReaderImpl
     return writer;
   }
 
-  // Sendable implementation
+  @Override
   public void sendTo(DataOutput out) throws IOException {
     PdxReaderImpl ur = getUnmodifiableReader();
     if (ur.getPdxType().getHasDeletedField()) {
@@ -169,6 +168,7 @@ public class PdxInstanceImpl extends PdxReaderImpl
     }
   }
 
+  @Override
   public byte[] toBytes() {
     PdxReaderImpl ur = getUnmodifiableReader();
     if (ur.getPdxType().getHasDeletedField()) {
@@ -474,6 +474,7 @@ public class PdxInstanceImpl extends PdxReaderImpl
     return result.toString();
   }
 
+  @Override
   public List<String> getFieldNames() {
     return getPdxType().getFieldNames();
   }
@@ -488,6 +489,7 @@ public class PdxInstanceImpl extends PdxReaderImpl
     this.cachedObjectForm = null;
   }
 
+  @Override
   public WritablePdxInstance createWriter() {
     if (isEnum()) {
       throw new IllegalStateException("PdxInstances that are an enum can not be modified.");
@@ -617,10 +619,12 @@ public class PdxInstanceImpl extends PdxReaderImpl
     super.basicSendTo(bb);
   }
 
+  @Override
   public String getClassName() {
     return getPdxType().getClassName();
   }
 
+  @Override
   public boolean isEnum() {
     return false;
   }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BrokenSerializationConsistencyRegressionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BrokenSerializationConsistencyRegressionTest.java
new file mode 100644
index 0000000..c4b6c61
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BrokenSerializationConsistencyRegressionTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static com.googlecode.catchexception.CatchException.catchException;
+import static com.googlecode.catchexception.CatchException.caughtException;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.apache.geode.internal.lang.SystemPropertyHelper.EARLY_ENTRY_EVENT_SERIALIZATION;
+import static org.apache.geode.internal.lang.SystemPropertyHelper.GEODE_PREFIX;
+import static org.apache.geode.test.dunit.Host.getHost;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.DataSerializable;
+import org.apache.geode.ToDataException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.pdx.PdxReader;
+import org.apache.geode.pdx.PdxSerializable;
+import org.apache.geode.pdx.PdxWriter;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class BrokenSerializationConsistencyRegressionTest implements Serializable {
+
+  private static final String REGION_NAME = "replicateRegion";
+  private static final String REGION_NAME2 = "replicateRegion2";
+  private static final String KEY = "key";
+
+  private VM vm0;
+
+  private transient FailsToDataSerialize valueFailsToSerialize;
+  private transient FailsToPdxSerialize pdxValueFailsToSerialize;
+  private transient String stringValue;
+  private transient PdxValue pdxValue;
+  private transient byte[] bytesValue;
+
+  @ClassRule
+  public static DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Before
+  public void setUpAll() {
+    vm0 = getHost(0).getVM(0);
+
+    System.setProperty(GEODE_PREFIX + EARLY_ENTRY_EVENT_SERIALIZATION, "true");
+    createReplicateRegions();
+
+    vm0.invoke(() -> {
+      System.setProperty(GEODE_PREFIX + EARLY_ENTRY_EVENT_SERIALIZATION, "true");
+      createReplicateRegions();
+    });
+
+    valueFailsToSerialize = new FailsToDataSerialize();
+    pdxValueFailsToSerialize = new FailsToPdxSerialize();
+
+    stringValue = "hello world";
+    pdxValue = new PdxValue();
+    bytesValue = new byte[] {0, 1, 2};
+  }
+
+  @Test
+  public void pdxValuePdxSerializes() {
+    Region<String, PdxValue> region = cacheRule.getCache().getRegion(REGION_NAME);
+    region.put(KEY, pdxValue);
+
+    vm0.invoke(() -> {
+      Region<String, PdxValue> regionOnVm0 = cacheRule.getCache().getRegion(REGION_NAME);
+      assertThat(regionOnVm0.get(KEY)).isInstanceOf(PdxValue.class);
+    });
+  }
+
+  @Test
+  public void bytesValueDataSerializes() {
+    Region<String, byte[]> region = cacheRule.getCache().getRegion(REGION_NAME);
+    region.put(KEY, bytesValue);
+
+    vm0.invoke(() -> {
+      Region<String, byte[]> regionOnVm0 = cacheRule.getCache().getRegion(REGION_NAME);
+      assertThat(regionOnVm0.get(KEY)).isNotNull().isInstanceOf(byte[].class);
+    });
+  }
+
+  @Test
+  public void failureToDataSerializeFailsToPropagate() {
+    Region<String, DataSerializable> region = cacheRule.getCache().getRegion(REGION_NAME);
+    catchException(region).put(KEY, valueFailsToSerialize);
+
+    Exception caughtException = caughtException();
+    assertThat(caughtException).isInstanceOf(ToDataException.class);
+    assertThat(caughtException.getCause()).isInstanceOf(IOException.class)
+        .hasMessage("FailsToSerialize");
+    assertThat(region.get(KEY)).isNull();
+
+    vm0.invoke(() -> {
+      Region<String, DataSerializable> regionOnVm0 = cacheRule.getCache().getRegion(REGION_NAME);
+      assertThat(regionOnVm0.get(KEY)).isNull();
+    });
+  }
+
+  @Test
+  public void failureToDataSerializeFailsToPropagateInTransaction() {
+    Region<String, DataSerializable> region = cacheRule.getCache().getRegion(REGION_NAME);
+    Region<String, String> region2 = cacheRule.getCache().getRegion(REGION_NAME2);
+    TXManagerImpl txManager = cacheRule.getCache().getTxManager();
+    txManager.begin();
+    region2.put(KEY, stringValue);
+    region.put(KEY, valueFailsToSerialize);
+    catchException(txManager).commit();
+
+    Exception caughtException = caughtException();
+    assertThat(caughtException).isInstanceOf(ToDataException.class);
+    assertThat(caughtException.getCause()).isInstanceOf(IOException.class)
+        .hasMessage("FailsToSerialize");
+    assertThat(region.get(KEY)).isNull();
+    assertThat(region2.get(KEY)).isNull();
+
+    vm0.invoke(() -> {
+      Region<String, DataSerializable> regionOnVm0 = cacheRule.getCache().getRegion(REGION_NAME);
+      Region<String, String> region2OnVm0 = cacheRule.getCache().getRegion(REGION_NAME2);
+      assertThat(regionOnVm0.get(KEY)).isNull();
+      assertThat(region2OnVm0.get(KEY)).isNull();
+    });
+  }
+
+  @Test
+  public void failureToPdxSerializeFails() {
+    Region<String, PdxSerializable> region = cacheRule.getCache().getRegion(REGION_NAME);
+    catchException(region).put(KEY, pdxValueFailsToSerialize);
+
+    Exception caughtException = caughtException();
+    assertThat(caughtException).isInstanceOf(ToDataException.class);
+    assertThat(caughtException.getCause()).isInstanceOf(UncheckedIOException.class);
+    assertThat(caughtException.getCause().getCause()).isInstanceOf(IOException.class)
+        .hasMessage("FailsToSerialize");
+    assertThat(region.get(KEY)).isNull();
+
+    vm0.invoke(() -> {
+      Region<String, PdxSerializable> regionOnVm0 = cacheRule.getCache().getRegion(REGION_NAME);
+      assertThat(regionOnVm0.get(KEY)).isNull();
+    });
+  }
+
+  private void createReplicateRegions() {
+    RegionFactory<?, ?> regionFactory = cacheRule.getOrCreateCache().createRegionFactory(REPLICATE);
+    regionFactory.create(REGION_NAME);
+    regionFactory.create(REGION_NAME2);
+  }
+
+  private static class FailsToDataSerialize implements DataSerializable {
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      throw new IOException("FailsToSerialize");
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+      // nothing
+    }
+  }
+
+  private static class FailsToPdxSerialize implements PdxSerializable {
+
+    @Override
+    public void toData(PdxWriter writer) {
+      throw new UncheckedIOException(new IOException("FailsToSerialize"));
+    }
+
+    @Override
+    public void fromData(PdxReader reader) {
+      // nothing
+    }
+  }
+
+  public static class PdxValue implements PdxSerializable {
+
+    @Override
+    public void toData(PdxWriter writer) {
+      // nothing
+    }
+
+    @Override
+    public void fromData(PdxReader reader) {
+      // nothing
+    }
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventSerializationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventSerializationTest.java
new file mode 100644
index 0000000..50ab538
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventSerializationTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static com.googlecode.catchexception.CatchException.catchException;
+import static com.googlecode.catchexception.CatchException.caughtException;
+import static org.apache.geode.internal.lang.SystemPropertyHelper.EARLY_ENTRY_EVENT_SERIALIZATION;
+import static org.apache.geode.internal.lang.SystemPropertyHelper.GEODE_PREFIX;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.NotSerializableException;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.SerializationException;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.pdx.internal.PdxInstanceImpl;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class EntryEventSerializationTest {
+
+  private InternalRegion region;
+  private InternalEntryEvent event;
+
+  private EntryEventSerialization instance;
+
+  @Rule
+  public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+
+  @Before
+  public void setUp() {
+    System.setProperty(GEODE_PREFIX + EARLY_ENTRY_EVENT_SERIALIZATION, "true");
+
+    region = mock(InternalRegion.class);
+    event = mock(InternalEntryEvent.class);
+
+    when(region.getScope()).thenReturn(Scope.DISTRIBUTED_ACK);
+    when(event.basicGetNewValue()).thenReturn("newValue");
+
+    instance = new EntryEventSerialization();
+  }
+
+  @Test
+  public void serializeNewValueIfNeeded_bothNull() {
+    assertThatThrownBy(() -> instance.serializeNewValueIfNeeded(null, null))
+        .isInstanceOf(NullPointerException.class);
+  }
+
+  @Test
+  public void serializeNewValueIfNeeded_regionNull() {
+    assertThatThrownBy(() -> instance.serializeNewValueIfNeeded(null, event))
+        .isInstanceOf(NullPointerException.class);
+  }
+
+  @Test
+  public void serializeNewValueIfNeeded_eventNull() {
+    assertThatThrownBy(() -> instance.serializeNewValueIfNeeded(region, null))
+        .isInstanceOf(NullPointerException.class);
+  }
+
+  @Test
+  public void serializeNewValueIfNeeded_bothMocks() {
+    assertThatCode(() -> instance.serializeNewValueIfNeeded(region, event))
+        .doesNotThrowAnyException();
+  }
+
+  @Test
+  public void localRegionDoesNothing() {
+    when(region.getScope()).thenReturn(Scope.LOCAL);
+
+    instance.serializeNewValueIfNeeded(region, event);
+
+    verify(event, times(0)).setCachedSerializedNewValue(any());
+  }
+
+  @Test
+  public void distributedAckRegionSetsCachedSerializedNewValue() {
+    instance.serializeNewValueIfNeeded(region, event);
+
+    ArgumentCaptor<byte[]> captor = ArgumentCaptor.forClass(byte[].class);
+    verify(event, times(1)).setCachedSerializedNewValue(captor.capture());
+    assertThat(captor.getValue().length).isGreaterThan(0);
+  }
+
+  @Test
+  public void distributedNoAckRegionSetsCachedSerializedNewValue() {
+    when(region.getScope()).thenReturn(Scope.DISTRIBUTED_NO_ACK);
+
+    instance.serializeNewValueIfNeeded(region, event);
+
+    ArgumentCaptor<byte[]> captor = ArgumentCaptor.forClass(byte[].class);
+    verify(event, times(1)).setCachedSerializedNewValue(captor.capture());
+    assertThat(captor.getValue().length).isGreaterThan(0);
+  }
+
+  @Test
+  public void globalRegionSetsCachedSerializedNewValue() {
+    when(region.getScope()).thenReturn(Scope.GLOBAL);
+
+    instance.serializeNewValueIfNeeded(region, event);
+
+    ArgumentCaptor<byte[]> captor = ArgumentCaptor.forClass(byte[].class);
+    verify(event, times(1)).setCachedSerializedNewValue(captor.capture());
+    assertThat(captor.getValue().length).isGreaterThan(0);
+  }
+
+  @Test
+  public void hasCachedSerializedNewValueDoesNothing() {
+    when(event.getCachedSerializedNewValue()).thenReturn(new byte[0]);
+
+    instance.serializeNewValueIfNeeded(region, event);
+
+    verify(event, times(0)).setCachedSerializedNewValue(any());
+  }
+
+  @Test
+  public void newValueIsByteArrayDoesNothing() {
+    when(event.basicGetNewValue()).thenReturn(new byte[0]);
+
+    instance.serializeNewValueIfNeeded(region, event);
+
+    verify(event, times(0)).setCachedSerializedNewValue(any());
+  }
+
+  @Test
+  public void newValueIsCachedDeserializableUsesItsSerializedValue() {
+    CachedDeserializable newValue = mock(CachedDeserializable.class);
+    when(event.basicGetNewValue()).thenReturn(newValue);
+    byte[] bytes = new byte[] {0, 3, 4};
+    when(newValue.getSerializedValue()).thenReturn(bytes);
+
+    instance.serializeNewValueIfNeeded(region, event);
+
+    ArgumentCaptor<byte[]> captor = ArgumentCaptor.forClass(byte[].class);
+    verify(event, times(1)).setCachedSerializedNewValue(captor.capture());
+    assertThat(captor.getValue()).isEqualTo(bytes);
+  }
+
+  @Test
+  public void newValueIsSerializableUsesItsSerializedValue() {
+    String newValue = "newValue";
+    when(event.basicGetNewValue()).thenReturn(newValue);
+
+    instance.serializeNewValueIfNeeded(region, event);
+
+    ArgumentCaptor<byte[]> captor = ArgumentCaptor.forClass(byte[].class);
+    verify(event, times(1)).setCachedSerializedNewValue(captor.capture());
+    assertThat(captor.getValue()).isEqualTo(EntryEventImpl.serialize(newValue));
+  }
+
+  @Test
+  public void newValueIsNotSerializableThrows() {
+    Object newValue = new Object();
+    when(event.basicGetNewValue()).thenReturn(newValue);
+
+    catchException(instance).serializeNewValueIfNeeded(region, event);
+
+    Exception thrown = caughtException();
+    assertThat(thrown).isInstanceOf(SerializationException.class);
+    assertThat(thrown.getCause()).isInstanceOf(NotSerializableException.class);
+  }
+
+  @Test
+  public void newValueIsPdxInstanceUsesItsSerializedValue() throws Exception {
+    PdxInstanceImpl newValue = mock(PdxInstanceImpl.class);
+    when(event.basicGetNewValue()).thenReturn(newValue);
+    byte[] bytes = new byte[] {0, 3, 4};
+    when(newValue.toBytes()).thenReturn(bytes);
+
+    instance.serializeNewValueIfNeeded(region, event);
+
+    ArgumentCaptor<byte[]> captor = ArgumentCaptor.forClass(byte[].class);
+    verify(event, times(1)).setCachedSerializedNewValue(captor.capture());
+    assertThat(captor.getValue()).isEqualTo(bytes);
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
klund@apache.org.