You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2018/03/26 20:51:08 UTC

[geode] branch develop updated: GEODE-4822 The second server instance startup error: Could not create an instance of PartitionRegionConfig

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new d8ec1ea  GEODE-4822 The second server instance startup error: Could not create an instance of PartitionRegionConfig
d8ec1ea is described below

commit d8ec1ead880566c0f911f2cd304b634717ee6836
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Mon Mar 26 13:45:27 2018 -0700

    GEODE-4822 The second server instance startup error: Could not create an instance of PartitionRegionConfig
    
    Added readUserObject and doWithPdxSerialized.  Modified fromData methods
    of DataSerializableFixedID classes to use these methods when reading
    keys, values, callback arguments and function arguments.
    
    Added a unit test for the new methods.
---
 .../main/java/org/apache/geode/DataSerializer.java |  12 +-
 .../apache/geode/cache/query/internal/CqEntry.java |   3 +-
 .../internal/CumulativeNonDistinctResults.java     |   3 +-
 .../cache/query/internal/LinkedResultSet.java      |  10 +-
 .../cache/query/internal/LinkedStructSet.java      |  12 +-
 .../cache/query/internal/NWayMergeResults.java     |   3 +-
 .../geode/cache/query/internal/ResultsBag.java     |   3 +-
 .../query/internal/ResultsCollectionWrapper.java   |  12 +-
 .../geode/cache/query/internal/ResultsSet.java     |   8 +-
 .../cache/query/internal/SortedResultSet.java      |  10 +-
 .../cache/query/internal/SortedStructSet.java      |  36 +++-
 .../geode/cache/query/internal/StructImpl.java     |   5 +-
 .../geode/cache/query/internal/StructSet.java      |  26 ++-
 .../org/apache/geode/internal/DSFIDFactory.java    | 126 +++++------
 .../geode/internal/InternalDataSerializer.java     | 117 ++++++++---
 .../internal/admin/remote/DestroyEntryMessage.java |   3 +-
 .../internal/admin/remote/RemoteEntrySnapshot.java |   5 +-
 .../admin/remote/RemoteRegionSnapshot.java         |   3 +-
 .../geode/internal/cache/AbstractRegionMap.java    |   2 +-
 .../geode/internal/cache/DestroyOperation.java     |   3 +-
 .../cache/DestroyPartitionedRegionMessage.java     |   3 +-
 .../internal/cache/DistributedCacheOperation.java  |   2 +-
 .../internal/cache/DistributedPutAllOperation.java |   4 +-
 .../DistributedRegionFunctionStreamingMessage.java |   3 +-
 .../cache/DistributedRemoveAllOperation.java       |   2 +-
 .../geode/internal/cache/EntryEventImpl.java       |  10 +-
 .../cache/FunctionStreamingReplyMessage.java       |   3 +-
 .../geode/internal/cache/GemFireCacheImpl.java     |   5 +-
 .../internal/cache/InitialImageOperation.java      |   2 +-
 .../geode/internal/cache/InvalidateOperation.java  |   3 +-
 .../cache/InvalidatePartitionedRegionMessage.java  |   3 +-
 .../cache/LatestLastAccessTimeMessage.java         |   3 +-
 .../apache/geode/internal/cache/LocalRegion.java   |   5 +-
 .../geode/internal/cache/NonLocalRegionEntry.java  |   5 +-
 .../geode/internal/cache/QueuedOperation.java      |   5 +-
 .../cache/SearchLoadAndWriteProcessor.java         |  14 +-
 .../geode/internal/cache/TXCommitMessage.java      |  11 +-
 .../internal/cache/TXRegionLockRequestImpl.java    |   2 +-
 .../cache/UpdateEntryVersionOperation.java         |   3 +-
 .../geode/internal/cache/UpdateOperation.java      |   2 +-
 .../internal/cache/WrappedCallbackArgument.java    |   3 +-
 .../cache/execute/FunctionRemoteContext.java       |   3 +-
 .../cache/partitioned/ContainsKeyValueMessage.java |   3 +-
 .../internal/cache/partitioned/DestroyMessage.java |   4 +-
 .../DestroyRegionOnDataStoreMessage.java           |   3 +-
 .../cache/partitioned/FetchBulkEntriesMessage.java |  14 +-
 .../cache/partitioned/FetchEntryMessage.java       |   2 +-
 .../cache/partitioned/FetchKeysMessage.java        |  21 +-
 .../internal/cache/partitioned/GetMessage.java     |   7 +-
 .../cache/partitioned/PRTombstoneMessage.java      |   3 +-
 .../partitioned/PRUpdateEntryVersionMessage.java   |   3 +-
 .../cache/partitioned/PutAllPRMessage.java         |   2 +-
 .../internal/cache/partitioned/PutMessage.java     |   6 +-
 .../internal/cache/partitioned/QueryMessage.java   |   5 +-
 .../cache/partitioned/RemoveAllPRMessage.java      |   2 +-
 .../tier/sockets/ClientInterestMessageImpl.java    |   7 +-
 .../tier/sockets/ClientUpdateMessageImpl.java      |   2 +-
 .../cache/tier/sockets/ObjectPartList.java         |   5 +-
 .../cache/tier/sockets/ObjectPartList651.java      |   5 +-
 .../cache/tier/sockets/VersionedObjectList.java    |   4 +-
 .../geode/internal/cache/tx/DistTxEntryEvent.java  |   4 +-
 .../cache/tx/RemoteContainsKeyValueMessage.java    |   3 +-
 .../internal/cache/tx/RemoteDestroyMessage.java    |   6 +-
 .../internal/cache/tx/RemoteFetchEntryMessage.java |   2 +-
 .../internal/cache/tx/RemoteFetchKeysMessage.java  |  22 +-
 .../cache/tx/RemoteFetchVersionMessage.java        |   3 +-
 .../geode/internal/cache/tx/RemoteGetMessage.java  |   7 +-
 .../geode/internal/cache/tx/RemotePutMessage.java  |   8 +-
 .../internal/cache/wan/GatewaySenderEventImpl.java |   5 +-
 ...aySenderQueueEntrySynchronizationOperation.java |   3 +-
 .../cache/wan/serial/BatchDestroyOperation.java    |   3 +-
 .../org/apache/geode/internal/util/BlobHelper.java |  16 ++
 .../internal/cli/functions/CliFunctionResult.java  |  10 +-
 .../apache/geode/pdx/internal/PdxInstanceImpl.java |  19 +-
 .../apache/geode/pdx/internal/TypeRegistry.java    |  42 +++-
 .../geode/codeAnalysis/ClassAndMethodDetails.java  |   6 +-
 ...alDataSerializerSerializationWhitelistTest.java |   2 +
 .../cache/EntryEventSerializationTest.java         |   3 +
 .../internal/cache/PartitionRegionConfigTest.java  |   2 +
 .../geode/pdx/AutoSerializableJUnitTest.java       |   7 +-
 .../apache/geode/pdx/PdxSerializerJUnitTest.java   | 138 ++++++++++++
 .../geode/pdx/PdxSerializerRegressionTest.java     | 106 ++++++++++
 .../apache/geode/test/junit/rules/VMProvider.java  |   4 +
 .../codeAnalysis/sanctionedDataSerializables.txt   | 233 ++++++++++-----------
 .../lucene/internal/LuceneResultStructImpl.java    |   5 +-
 .../lucene/internal/distributed/EntryScore.java    |   3 +-
 .../cache/lucene/internal/results/PageEntry.java   |   5 +-
 87 files changed, 856 insertions(+), 407 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/DataSerializer.java b/geode-core/src/main/java/org/apache/geode/DataSerializer.java
index c7efb40..1bc8792 100644
--- a/geode-core/src/main/java/org/apache/geode/DataSerializer.java
+++ b/geode-core/src/main/java/org/apache/geode/DataSerializer.java
@@ -190,7 +190,7 @@ public abstract class DataSerializer {
       Boolean.getBoolean("DataSerializer.TRACE_SERIALIZABLE");
 
   /* Used to prevent standard Java serialization when sending data to a non-Java client */
-  protected static final ThreadLocal<Boolean> DISALLOW_JAVA_SERIALIZATION =
+  protected static final ThreadLocal<Boolean> disallowJavaSerializationForThread =
       new ThreadLocal<Boolean>();
 
   /**
@@ -2648,7 +2648,7 @@ public abstract class DataSerializer {
       return null;
     } else {
       Comparator<? super K> c =
-          InternalDataSerializer.<Comparator<? super K>>readNonPdxInstanceObject(in);
+          InternalDataSerializer.<Comparator<? super K>>readDeserializedObject(in);
       TreeMap<K, V> map = new TreeMap<K, V>(c);
       for (int i = 0; i < size; i++) {
         K key = DataSerializer.<K>readObject(in);
@@ -2786,7 +2786,7 @@ public abstract class DataSerializer {
       return null;
     } else {
       Comparator<? super E> c =
-          InternalDataSerializer.<Comparator<? super E>>readNonPdxInstanceObject(in);
+          InternalDataSerializer.<Comparator<? super E>>readDeserializedObject(in);
       TreeSet<E> set = new TreeSet<E>(c);
       for (int i = 0; i < size; i++) {
         E element = DataSerializer.<E>readObject(in);
@@ -2906,12 +2906,12 @@ public abstract class DataSerializer {
       return;
     }
 
-    DISALLOW_JAVA_SERIALIZATION.set(Boolean.TRUE);
+    disallowJavaSerializationForThread.set(Boolean.TRUE);
     try {
       writeObject(o, out);
     } finally {
-      DISALLOW_JAVA_SERIALIZATION.set(Boolean.FALSE); // with JDK 1.5 this can be changed to
-                                                      // remove()
+      disallowJavaSerializationForThread.set(Boolean.FALSE); // with JDK 1.5 this can be changed to
+      // remove()
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CqEntry.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CqEntry.java
index 29b7a53..fb01a07 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CqEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CqEntry.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 
 
@@ -90,7 +91,7 @@ public class CqEntry implements DataSerializableFixedID {
   /* DataSerializableFixedID methods ---------------------------------------- */
 
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    this.key = DataSerializer.readObject(in);
+    this.key = InternalDataSerializer.readUserObject(in);
     this.value = DataSerializer.readObject(in);
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CumulativeNonDistinctResults.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CumulativeNonDistinctResults.java
index d0d63d5..2acc084 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CumulativeNonDistinctResults.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CumulativeNonDistinctResults.java
@@ -39,6 +39,7 @@ import org.apache.geode.cache.query.types.ObjectType;
 import org.apache.geode.internal.DataSerializableFixedID;
 import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.HeapDataOutputStream.LongUpdater;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 
 /**
@@ -298,7 +299,7 @@ public class CumulativeNonDistinctResults<E> implements SelectResults<E>, DataSe
         Object[] fields = DataSerializer.readObjectArray(in);
         this.data.add((E) new StructImpl((StructTypeImpl) elementType, fields));
       } else {
-        E element = DataSerializer.readObject(in);
+        E element = InternalDataSerializer.readUserObject(in);
         this.data.add(element);
       }
       --numLeft;
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/LinkedResultSet.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/LinkedResultSet.java
index 949e007..b51364d 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/LinkedResultSet.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/LinkedResultSet.java
@@ -35,8 +35,10 @@ import org.apache.geode.cache.query.types.CollectionType;
 import org.apache.geode.cache.query.types.ObjectType;
 import org.apache.geode.cache.query.types.StructType;
 import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.i18n.LocalizedStrings;
+import org.apache.geode.pdx.internal.PdxInstanceImpl;
 
 public class LinkedResultSet extends java.util.LinkedHashSet
     implements Ordered, SelectResults, DataSerializableFixedID {
@@ -103,9 +105,11 @@ public class LinkedResultSet extends java.util.LinkedHashSet
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     int size = in.readInt();
     this.elementType = (ObjectType) DataSerializer.readObject(in);
-    for (int j = size; j > 0; j--) {
-      this.add(DataSerializer.readObject(in));
-    }
+    InternalDataSerializer.doWithPdxReadSerialized(() -> {
+      for (int j = size; j > 0; j--) {
+        this.add(DataSerializer.readObject(in));
+      }
+    });
   }
 
   public void toData(DataOutput out) throws IOException {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/LinkedStructSet.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/LinkedStructSet.java
index 52aad36..8dfbc22 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/LinkedStructSet.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/LinkedStructSet.java
@@ -34,8 +34,10 @@ import org.apache.geode.cache.query.internal.types.StructTypeImpl;
 import org.apache.geode.cache.query.types.CollectionType;
 import org.apache.geode.cache.query.types.ObjectType;
 import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.i18n.LocalizedStrings;
+import org.apache.geode.pdx.internal.PdxInstanceImpl;
 
 public class LinkedStructSet extends LinkedHashSet<Struct>
     implements SelectResults<Struct>, Ordered, DataSerializableFixedID {
@@ -316,10 +318,12 @@ public class LinkedStructSet extends LinkedHashSet<Struct>
     this.modifiable = in.readBoolean();
     int size = in.readInt();
     this.structType = (StructTypeImpl) DataSerializer.readObject(in);
-    for (int j = size; j > 0; j--) {
-      Object[] fieldValues = DataSerializer.readObject(in);
-      this.add(new StructImpl(this.structType, fieldValues));
-    }
+    InternalDataSerializer.doWithPdxReadSerialized(() -> {
+      for (int j = size; j > 0; j--) {
+        Object[] fieldValues = DataSerializer.readObject(in);
+        this.add(new StructImpl(this.structType, fieldValues));
+      }
+    });
   }
 
   public int getDSFID() {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/NWayMergeResults.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/NWayMergeResults.java
index 9ba85d6..93299f6 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/NWayMergeResults.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/NWayMergeResults.java
@@ -38,6 +38,7 @@ import org.apache.geode.cache.query.types.ObjectType;
 import org.apache.geode.internal.DataSerializableFixedID;
 import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.HeapDataOutputStream.LongUpdater;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 
 /**
@@ -449,7 +450,7 @@ public class NWayMergeResults<E> implements SelectResults<E>, Ordered, DataSeria
         Object[] fields = DataSerializer.readObjectArray(in);
         this.data.add((E) new StructImpl((StructTypeImpl) elementType, fields));
       } else {
-        E element = DataSerializer.readObject(in);
+        E element = InternalDataSerializer.readUserObject(in);
         this.data.add(element);
       }
       --numLeft;
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/ResultsBag.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/ResultsBag.java
index 6efbdd5..3241bc0 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/ResultsBag.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/ResultsBag.java
@@ -25,6 +25,7 @@ import org.apache.geode.cache.query.SelectResults;
 import org.apache.geode.cache.query.internal.ObjectIntHashMap.Entry;
 import org.apache.geode.cache.query.types.ObjectType;
 import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.CachePerfStats;
 
@@ -145,7 +146,7 @@ public class ResultsBag extends Bag implements DataSerializableFixedID {
     int numLeft = this.size - this.numNulls;
 
     while (numLeft > 0) {
-      Object key = DataSerializer.readObject(in);
+      Object key = InternalDataSerializer.readUserObject(in);
       int occurence = in.readInt();
       this.map.put(key, occurence);
       numLeft -= occurence;
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/ResultsCollectionWrapper.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/ResultsCollectionWrapper.java
index ecb1f4a..1634adf 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/ResultsCollectionWrapper.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/ResultsCollectionWrapper.java
@@ -529,11 +529,13 @@ public class ResultsCollectionWrapper implements SelectResults, DataSerializable
    */
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     boolean isBagSetView = in.readBoolean();
-    if (isBagSetView) {
-      this.base = (Set) InternalDataSerializer.readSet(in);
-    } else {
-      this.base = (Collection) DataSerializer.readObject(in);
-    }
+    InternalDataSerializer.doWithPdxReadSerialized(() -> {
+      if (isBagSetView) {
+        this.base = (Set) InternalDataSerializer.readSet(in);
+      } else {
+        this.base = (Collection) DataSerializer.readObject(in);
+      }
+    });
     this.collectionType = (CollectionType) DataSerializer.readObject(in);
     this.modifiable = in.readBoolean();
   }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/ResultsSet.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/ResultsSet.java
index 22bcabc..46625e5 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/ResultsSet.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/ResultsSet.java
@@ -124,9 +124,11 @@ public class ResultsSet extends HashSet implements SelectResults, DataSerializab
     ObjectTypeImpl clt = new ObjectTypeImpl();
     InternalDataSerializer.invokeFromData(clt, in);
     setElementType(clt);
-    for (int k = size; k > 0; k--) {
-      this.add(DataSerializer.readObject(in));
-    }
+    InternalDataSerializer.doWithPdxReadSerialized(() -> {
+      for (int k = size; k > 0; k--) {
+        this.add(DataSerializer.readObject(in));
+      }
+    });
   }
 
   public void toData(DataOutput out) throws IOException {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/SortedResultSet.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/SortedResultSet.java
index 2183e69..bcb0b66 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/SortedResultSet.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/SortedResultSet.java
@@ -23,8 +23,10 @@ import org.apache.geode.cache.query.*;
 import org.apache.geode.cache.query.internal.types.*;
 import org.apache.geode.cache.query.types.*;
 import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.i18n.LocalizedStrings;
+import org.apache.geode.pdx.internal.PdxInstanceImpl;
 
 /**
  * Implementation of SelectResults that extends TreeSet This is the sorted version of ResultSet used
@@ -104,9 +106,11 @@ public class SortedResultSet extends TreeSet
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     int size = in.readInt();
     this.elementType = (ObjectType) DataSerializer.readObject(in);
-    for (int j = size; j > 0; j--) {
-      this.add(DataSerializer.readObject(in));
-    }
+    InternalDataSerializer.doWithPdxReadSerialized(() -> {
+      for (int j = size; j > 0; j--) {
+        this.add(DataSerializer.readObject(in));
+      }
+    });
   }
 
   public void toData(DataOutput out) throws IOException {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/SortedStructSet.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/SortedStructSet.java
index ca59fc0..3c3e151 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/SortedStructSet.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/SortedStructSet.java
@@ -14,14 +14,26 @@
  */
 package org.apache.geode.cache.query.internal;
 
-import java.io.*;
-import java.util.*;
-
-import org.apache.geode.*;
-import org.apache.geode.cache.query.*;
-import org.apache.geode.cache.query.internal.types.*;
-import org.apache.geode.cache.query.types.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.Struct;
+import org.apache.geode.cache.query.internal.types.CollectionTypeImpl;
+import org.apache.geode.cache.query.internal.types.StructTypeImpl;
+import org.apache.geode.cache.query.types.CollectionType;
+import org.apache.geode.cache.query.types.ObjectType;
 import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
@@ -343,10 +355,12 @@ public class SortedStructSet extends TreeSet
     this.modifiable = in.readBoolean();
     int size = in.readInt();
     this.structType = (StructTypeImpl) DataSerializer.readObject(in);
-    for (int j = size; j > 0; j--) {
-      Object[] fieldValues = DataSerializer.readObject(in);
-      this.addFieldValues(fieldValues);
-    }
+    InternalDataSerializer.doWithPdxReadSerialized(() -> {
+      for (int j = size; j > 0; j--) {
+        Object[] fieldValues = DataSerializer.readObject(in);
+        this.addFieldValues(fieldValues);
+      }
+    });
   }
 
   public void toData(DataOutput out) throws IOException {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/StructImpl.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/StructImpl.java
index 76bd3b3..4c298a9 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/StructImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/StructImpl.java
@@ -27,6 +27,7 @@ import org.apache.geode.cache.query.internal.types.StructTypeImpl;
 import org.apache.geode.cache.query.types.ObjectType;
 import org.apache.geode.cache.query.types.StructType;
 import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.pdx.PdxInstance;
@@ -165,7 +166,9 @@ public class StructImpl implements Struct, DataSerializableFixedID, Serializable
 
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     this.type = (StructTypeImpl) DataSerializer.readObject(in);
-    this.values = DataSerializer.readObjectArray(in);
+    InternalDataSerializer.doWithPdxReadSerialized(() -> {
+      this.values = DataSerializer.readObjectArray(in);
+    });
     if (this.values != null) {
       for (Object o : values) {
         if (o instanceof PdxInstance) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/StructSet.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/StructSet.java
index 5870632..92b9a37 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/StructSet.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/StructSet.java
@@ -17,17 +17,27 @@ package org.apache.geode.cache.query.internal;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 
 import it.unimi.dsi.fastutil.objects.AbstractObjectIterator;
 import it.unimi.dsi.fastutil.objects.ObjectIterator;
 import it.unimi.dsi.fastutil.objects.ObjectOpenCustomHashSet;
 
 import org.apache.geode.DataSerializer;
-import org.apache.geode.cache.query.*;
-import org.apache.geode.cache.query.internal.types.*;
-import org.apache.geode.cache.query.types.*;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.Struct;
+import org.apache.geode.cache.query.internal.types.CollectionTypeImpl;
+import org.apache.geode.cache.query.internal.types.StructTypeImpl;
+import org.apache.geode.cache.query.types.CollectionType;
+import org.apache.geode.cache.query.types.ObjectType;
+import org.apache.geode.cache.query.types.StructType;
 import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
@@ -431,9 +441,11 @@ public class StructSet /* extends ObjectOpenCustomHashSet */ implements Set, Sel
     this.contents = new ObjectOpenCustomHashSet(new ObjectArrayHashingStrategy());
     int size = in.readInt();
     this.structType = (StructTypeImpl) DataSerializer.readObject(in);
-    for (int j = size; j > 0; j--) {
-      this.add(DataSerializer.readObject(in));
-    }
+    InternalDataSerializer.doWithPdxReadSerialized(() -> {
+      for (int j = size; j > 0; j--) {
+        this.add(DataSerializer.readObject(in));
+      }
+    });
   }
 
   public void toData(DataOutput out) throws IOException {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index 204459f..c98fb5c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -413,6 +413,7 @@ import org.apache.geode.management.internal.configuration.messages.Configuration
 import org.apache.geode.pdx.internal.CheckTypeRegistryState;
 import org.apache.geode.pdx.internal.EnumId;
 import org.apache.geode.pdx.internal.EnumInfo;
+import org.apache.geode.pdx.internal.TypeRegistry;
 
 /**
  * Factory for instances of DataSerializableFixedID instances. Note that this class implements
@@ -944,68 +945,73 @@ public class DSFIDFactory implements DataSerializableFixedID {
    * data input.
    */
   public static Object create(int dsfid, DataInput in) throws IOException, ClassNotFoundException {
-    switch (dsfid) {
-      case REGION:
-        return (DataSerializableFixedID) DataSerializer.readRegion(in);
-      case END_OF_STREAM_TOKEN:
-        return Token.END_OF_STREAM;
-      case DLOCK_REMOTE_TOKEN:
-        return DLockRemoteToken.createFromDataInput(in);
-      case TRANSACTION_ID:
-        return TXId.createFromData(in);
-      case INTEREST_RESULT_POLICY:
-        return readInterestResultPolicy(in);
-      case UNDEFINED:
-        return readUndefined(in);
-      case RESULTS_BAG:
-        return readResultsBag(in);
-      case TOKEN_INVALID:
-        return Token.INVALID;
-      case TOKEN_LOCAL_INVALID:
-        return Token.LOCAL_INVALID;
-      case TOKEN_DESTROYED:
-        return Token.DESTROYED;
-      case TOKEN_REMOVED:
-        return Token.REMOVED_PHASE1;
-      case TOKEN_REMOVED2:
-        return Token.REMOVED_PHASE2;
-      case TOKEN_TOMBSTONE:
-        return Token.TOMBSTONE;
-      case NULL_TOKEN:
-        return readNullToken(in);
-      case CONFIGURATION_RESPONSE:
-        return readConfigurationResponse(in);
-      case PR_DESTROY_ON_DATA_STORE_MESSAGE:
-        return readDestroyOnDataStore(in);
-      default:
-        final Constructor<?> cons;
-        if (dsfid >= Byte.MIN_VALUE && dsfid <= Byte.MAX_VALUE) {
-          cons = dsfidMap[dsfid + Byte.MAX_VALUE + 1];
-        } else {
-          cons = (Constructor<?>) dsfidMap2.get(dsfid);
-        }
-        if (cons != null) {
-          try {
-            Object ds = cons.newInstance((Object[]) null);
-            InternalDataSerializer.invokeFromData(ds, in);
-            return ds;
-          } catch (InstantiationException ie) {
-            throw new IOException(ie.getMessage(), ie);
-          } catch (IllegalAccessException iae) {
-            throw new IOException(iae.getMessage(), iae);
-          } catch (InvocationTargetException ite) {
-            Throwable targetEx = ite.getTargetException();
-            if (targetEx instanceof IOException) {
-              throw (IOException) targetEx;
-            } else if (targetEx instanceof ClassNotFoundException) {
-              throw (ClassNotFoundException) targetEx;
-            } else {
-              throw new IOException(ite.getMessage(), targetEx);
+    boolean readSerializedOverride = TypeRegistry.getPdxReadSerialized();
+    TypeRegistry.setPdxReadSerialized(false);
+    try {
+      switch (dsfid) {
+        case REGION:
+          return (DataSerializableFixedID) DataSerializer.readRegion(in);
+        case END_OF_STREAM_TOKEN:
+          return Token.END_OF_STREAM;
+        case DLOCK_REMOTE_TOKEN:
+          return DLockRemoteToken.createFromDataInput(in);
+        case TRANSACTION_ID:
+          return TXId.createFromData(in);
+        case INTEREST_RESULT_POLICY:
+          return readInterestResultPolicy(in);
+        case UNDEFINED:
+          return readUndefined(in);
+        case RESULTS_BAG:
+          return readResultsBag(in);
+        case TOKEN_INVALID:
+          return Token.INVALID;
+        case TOKEN_LOCAL_INVALID:
+          return Token.LOCAL_INVALID;
+        case TOKEN_DESTROYED:
+          return Token.DESTROYED;
+        case TOKEN_REMOVED:
+          return Token.REMOVED_PHASE1;
+        case TOKEN_REMOVED2:
+          return Token.REMOVED_PHASE2;
+        case TOKEN_TOMBSTONE:
+          return Token.TOMBSTONE;
+        case NULL_TOKEN:
+          return readNullToken(in);
+        case CONFIGURATION_RESPONSE:
+          return readConfigurationResponse(in);
+        case PR_DESTROY_ON_DATA_STORE_MESSAGE:
+          return readDestroyOnDataStore(in);
+        default:
+          final Constructor<?> cons;
+          if (dsfid >= Byte.MIN_VALUE && dsfid <= Byte.MAX_VALUE) {
+            cons = dsfidMap[dsfid + Byte.MAX_VALUE + 1];
+          } else {
+            cons = (Constructor<?>) dsfidMap2.get(dsfid);
+          }
+          if (cons != null) {
+            try {
+              Object ds = cons.newInstance((Object[]) null);
+              InternalDataSerializer.invokeFromData(ds, in);
+              return ds;
+            } catch (InstantiationException ie) {
+              throw new IOException(ie.getMessage(), ie);
+            } catch (IllegalAccessException iae) {
+              throw new IOException(iae.getMessage(), iae);
+            } catch (InvocationTargetException ite) {
+              Throwable targetEx = ite.getTargetException();
+              if (targetEx instanceof IOException) {
+                throw (IOException) targetEx;
+              } else if (targetEx instanceof ClassNotFoundException) {
+                throw (ClassNotFoundException) targetEx;
+              } else {
+                throw new IOException(ite.getMessage(), targetEx);
+              }
             }
           }
-        }
-        throw new DSFIDNotFoundException("Unknown DataSerializableFixedID: " + dsfid, dsfid);
-
+          throw new DSFIDNotFoundException("Unknown DataSerializableFixedID: " + dsfid, dsfid);
+      }
+    } finally {
+      TypeRegistry.setPdxReadSerialized(readSerializedOverride);
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
index c1ca15c..b0b0930 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
@@ -1749,7 +1749,7 @@ public abstract class InternalDataSerializer extends DataSerializer implements D
    *
    * @throws IOException If the serializer that can deserialize the object is not registered.
    */
-  private static Object readUserObject(DataInput in, int serializerId)
+  private static Object readUserClass(DataInput in, int serializerId)
       throws IOException, ClassNotFoundException {
     DataSerializer serializer = InternalDataSerializer.getSerializer(serializerId);
 
@@ -2249,7 +2249,7 @@ public abstract class InternalDataSerializer extends DataSerializer implements D
   }
 
   private static boolean disallowJavaSerialization() {
-    Boolean v = DISALLOW_JAVA_SERIALIZATION.get();
+    Boolean v = disallowJavaSerializationForThread.get();
     return v != null && v;
   }
 
@@ -2537,25 +2537,31 @@ public abstract class InternalDataSerializer extends DataSerializer implements D
 
   private static Object readDataSerializableFixedID(final DataInput in)
       throws IOException, ClassNotFoundException {
-    Class c = readClass(in);
+    boolean readSerializedOverride = TypeRegistry.getPdxReadSerialized();
+    TypeRegistry.setPdxReadSerialized(false);
     try {
-      Constructor init = c.getConstructor(new Class[0]);
-      init.setAccessible(true);
-      Object o = init.newInstance(new Object[0]);
+      Class c = readClass(in);
+      try {
+        Constructor init = c.getConstructor(new Class[0]);
+        init.setAccessible(true);
+        Object o = init.newInstance(new Object[0]);
 
-      invokeFromData(o, in);
+        invokeFromData(o, in);
 
-      if (logger.isTraceEnabled(LogMarker.SERIALIZER)) {
-        logger.trace(LogMarker.SERIALIZER, "Read DataSerializableFixedID {}", o);
-      }
+        if (logger.isTraceEnabled(LogMarker.SERIALIZER)) {
+          logger.trace(LogMarker.SERIALIZER, "Read DataSerializableFixedID {}", o);
+        }
 
-      return o;
+        return o;
 
-    } catch (Exception ex) {
-      throw new SerializationException(
-          LocalizedStrings.DataSerializer_COULD_NOT_CREATE_AN_INSTANCE_OF_0
-              .toLocalizedString(c.getName()),
-          ex);
+      } catch (Exception ex) {
+        throw new SerializationException(
+            LocalizedStrings.DataSerializer_COULD_NOT_CREATE_AN_INSTANCE_OF_0
+                .toLocalizedString(c.getName()),
+            ex);
+      }
+    } finally {
+      TypeRegistry.setPdxReadSerialized(readSerializedOverride);
     }
   }
 
@@ -2774,31 +2780,72 @@ public abstract class InternalDataSerializer extends DataSerializer implements D
     }
   }
 
-  private static DataSerializer dvddeserializer;
-
-  // TODO: registerDVDDeserializer is unused
-  public static void registerDVDDeserializer(DataSerializer dvddeslzr) {
-    dvddeserializer = dvddeslzr;
-  }
-
   /**
    * Just like readObject but make sure and pdx deserialized is not a PdxInstance.
    *
    * @since GemFire 6.6.2
    */
-  public static <T> T readNonPdxInstanceObject(final DataInput in)
+  public static <T> T readDeserializedObject(final DataInput in)
       throws IOException, ClassNotFoundException {
-    boolean wouldReadSerialized = PdxInstanceImpl.getPdxReadSerialized();
+    boolean wouldReadSerialized = TypeRegistry.getPdxReadSerialized();
     if (!wouldReadSerialized) {
       return DataSerializer.readObject(in);
     } else {
-      PdxInstanceImpl.setPdxReadSerialized(false);
+      TypeRegistry.setPdxReadSerialized(false);
+      try {
+        return DataSerializer.readObject(in);
+      } finally {
+        TypeRegistry.setPdxReadSerialized(true);
+      }
+    }
+  }
+
+  /**
+   * Just like readObject but override PdxInstanceImpl.getPdxReadSerialized(), allowing
+   * the result to be a PdxInstance if pdx-read-serialized is enabled on the cache.
+   * Use this to read cache keys, values and callback values in DataSerializableFixedID
+   * fromData methods, which have pdx-read-serialize disabled.
+   */
+  public static <T> T readUserObject(final DataInput in)
+      throws IOException, ClassNotFoundException {
+    boolean wouldReadSerialized = TypeRegistry.getPdxReadSerialized();
+    if (wouldReadSerialized) {
+      return DataSerializer.readObject(in);
+    } else {
+      TypeRegistry.setPdxReadSerialized(true);
       try {
         return DataSerializer.readObject(in);
       } finally {
-        PdxInstanceImpl.setPdxReadSerialized(true);
+        TypeRegistry.setPdxReadSerialized(false);
+      }
+    }
+  }
+
+  /**
+   * This method is used by DataSerializableFixedID objects in their fromData methods
+   * to deserialize application objects such as keys, values and callback arguments.
+   * It allows the value to be read as a PdxInstance instead of being completely
+   * deserialized into a POJO.
+   *
+   * @param runnable code performing deserialization with PdxInstanceImpl.setPdxReadSerialized set
+   *        to true
+   * @throws ClassNotFoundException
+   * @throws IOException
+   */
+  public static void doWithPdxReadSerialized(RunnableThrowingException runnable)
+      throws ClassNotFoundException, IOException {
+    boolean isAlreadySet = TypeRegistry.getPdxReadSerialized();
+    if (!isAlreadySet) {
+      TypeRegistry.setPdxReadSerialized(true);
+    }
+    try {
+      runnable.run();
+    } finally {
+      if (!isAlreadySet) {
+        TypeRegistry.setPdxReadSerialized(false);
       }
     }
+
   }
 
   public static Object basicReadObject(final DataInput in)
@@ -2894,11 +2941,11 @@ public abstract class InternalDataSerializer extends DataSerializer implements D
       case TIME_UNIT:
         return readTimeUnit(in);
       case USER_CLASS:
-        return readUserObject(in, in.readByte());
+        return readUserClass(in, in.readByte());
       case USER_CLASS_2:
-        return readUserObject(in, in.readShort());
+        return readUserClass(in, in.readShort());
       case USER_CLASS_4:
-        return readUserObject(in, in.readInt());
+        return readUserClass(in, in.readInt());
       case VECTOR:
         return readVector(in);
       case STACK:
@@ -3849,4 +3896,14 @@ public abstract class InternalDataSerializer extends DataSerializer implements D
       classCache.clear();
     }
   }
+
+
+  /**
+   * @see #doWithPdxReadSerialized
+   */
+  @FunctionalInterface
+  public interface RunnableThrowingException {
+    void run() throws ClassNotFoundException, IOException;
+  }
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/admin/remote/DestroyEntryMessage.java b/geode-core/src/main/java/org/apache/geode/internal/admin/remote/DestroyEntryMessage.java
index 2e0300d..bfacf32 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/admin/remote/DestroyEntryMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/admin/remote/DestroyEntryMessage.java
@@ -25,6 +25,7 @@ import org.apache.geode.DataSerializer;
 import org.apache.geode.cache.ExpirationAction;
 import org.apache.geode.cache.Region;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
@@ -84,7 +85,7 @@ public class DestroyEntryMessage extends RegionAdminMessage {
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     super.fromData(in);
     this.action = (ExpirationAction) DataSerializer.readObject(in);
-    this.key = DataSerializer.readObject(in);
+    this.key = InternalDataSerializer.readUserObject(in);
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteEntrySnapshot.java b/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteEntrySnapshot.java
index 842b55f..b23d374 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteEntrySnapshot.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteEntrySnapshot.java
@@ -20,6 +20,7 @@ import java.io.*;
 import org.apache.geode.DataSerializable;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.cache.*;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.admin.*;
 
 public class RemoteEntrySnapshot implements EntrySnapshot, DataSerializable {
@@ -123,8 +124,8 @@ public class RemoteEntrySnapshot implements EntrySnapshot, DataSerializable {
 
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     this.name = DataSerializer.readObject(in);
-    this.value = DataSerializer.readObject(in);
+    this.value = InternalDataSerializer.readUserObject(in);
     this.stats = (RemoteCacheStatistics) DataSerializer.readObject(in);
-    this.userAttribute = DataSerializer.readObject(in);
+    this.userAttribute = InternalDataSerializer.readUserObject(in);
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteRegionSnapshot.java b/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteRegionSnapshot.java
index 961c1a7..f47b16c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteRegionSnapshot.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteRegionSnapshot.java
@@ -21,6 +21,7 @@ import java.util.*;
 import org.apache.geode.DataSerializable;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.cache.*;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.admin.*;
 
 public class RemoteRegionSnapshot implements RegionSnapshot, DataSerializable {
@@ -151,6 +152,6 @@ public class RemoteRegionSnapshot implements RegionSnapshot, DataSerializable {
     this.attributes = (RemoteRegionAttributes) DataSerializer.readObject(in);
     this.entryCount = in.readInt();
     this.subregionCount = in.readInt();
-    this.userAttribute = DataSerializer.readObject(in);
+    this.userAttribute = InternalDataSerializer.readUserObject(in);
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 100264a..08eacf8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -826,7 +826,7 @@ public abstract class AbstractRegionMap
       CachedDeserializable newValueCd = (CachedDeserializable) newValue;
       try {
         actualVal = BlobHelper.deserializeBlob(newValueCd.getSerializedValue(),
-            sender.getVersionObject(), null);
+            sender.getVersionObject(), null, true);
         newValue = new VMCachedDeserializable(actualVal, newValueCd.getSizeInBytes());
       } catch (IOException | ClassNotFoundException e) {
         throw new RuntimeException("Unable to deserialize HA event for region " + owner);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
index 3371c9c..ebc0242 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
@@ -26,6 +26,7 @@ import org.apache.geode.cache.TimeoutException;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.ConflationKey;
 import org.apache.geode.distributed.internal.DirectReplyProcessor;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
 import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -158,7 +159,7 @@ public class DestroyOperation extends DistributedCacheOperation {
     public void fromData(DataInput in) throws IOException, ClassNotFoundException {
       super.fromData(in);
       this.eventId = (EventID) DataSerializer.readObject(in);
-      this.key = DataSerializer.readObject(in);
+      this.key = InternalDataSerializer.readUserObject(in);
       Boolean hasTailKey = DataSerializer.readBoolean(in);
       if (hasTailKey.booleanValue()) {
         this.tailKey = DataSerializer.readLong(in);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyPartitionedRegionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyPartitionedRegionMessage.java
index 8db231b..838fa83 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyPartitionedRegionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyPartitionedRegionMessage.java
@@ -32,6 +32,7 @@ import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.ReplyMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.partitioned.PartitionMessage;
 import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
 import org.apache.geode.internal.cache.partitioned.RegionAdvisor.PartitionProfile;
@@ -190,7 +191,7 @@ public class DestroyPartitionedRegionMessage extends PartitionMessage {
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     super.fromData(in);
-    this.cbArg = DataSerializer.readObject(in);
+    this.cbArg = InternalDataSerializer.readUserObject(in);
     this.op = Operation.fromOrdinal(in.readByte());
     this.prSerial = in.readInt();
     int len = in.readInt();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index fb6aadf..ad1fd75 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -1358,7 +1358,7 @@ public abstract class DistributedCacheOperation {
       this.directAck = (bits & DIRECT_ACK_MASK) != 0;
       this.possibleDuplicate = (bits & POSSIBLE_DUPLICATE_MASK) != 0;
       if ((bits & CALLBACK_ARG_MASK) != 0) {
-        this.callbackArg = DataSerializer.readObject(in);
+        this.callbackArg = InternalDataSerializer.readUserObject(in);
       }
       this.hasDelta = (bits & DELTA_MASK) != 0;
       this.hasOldValue = (bits & OLD_VALUE_MASK) != 0;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
index bb95f44..f15e9be 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
@@ -330,10 +330,10 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation {
      */
     public PutAllEntryData(DataInput in, EventID baseEventID, int idx, Version version,
         ByteArrayDataInput bytesIn) throws IOException, ClassNotFoundException {
-      this.key = DataSerializer.readObject(in);
+      this.key = InternalDataSerializer.readUserObject(in);
       byte flgs = in.readByte();
       if ((flgs & IS_OBJECT) != 0) {
-        this.value = DataSerializer.readObject(in);
+        this.value = InternalDataSerializer.readUserObject(in);
       } else {
         byte[] bb = DataSerializer.readByteArray(in);
         if ((flgs & IS_CACHED_DESER) != 0) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage.java
index 3e2e0c5..ee39e41d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage.java
@@ -40,6 +40,7 @@ import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.ReplyMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LogMarker;
@@ -332,7 +333,7 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa
       this.functionObject = (Function) object;
       this.isFnSerializationReqd = true;
     }
-    this.args = (Serializable) DataSerializer.readObject(in);
+    this.args = (Serializable) InternalDataSerializer.readUserObject(in);
     this.filter = (HashSet) DataSerializer.readHashSet(in);
     this.regionPath = DataSerializer.readString(in);
     this.isReExecute = (flags & IS_REEXECUTE) != 0;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
index d254e78..6b35740 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
@@ -315,7 +315,7 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation {
      */
     public RemoveAllEntryData(DataInput in, EventID baseEventID, int idx, Version version,
         ByteArrayDataInput bytesIn) throws IOException, ClassNotFoundException {
-      this.key = DataSerializer.readObject(in);
+      this.key = InternalDataSerializer.readUserObject(in);
       this.oldValue = null;
       this.op = Operation.fromOrdinal(in.readByte());
       this.flags = in.readByte();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index d562d2c..f3c536d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -194,12 +194,12 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
    */
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     this.eventID = (EventID) DataSerializer.readObject(in);
-    Object key = DataSerializer.readObject(in);
-    Object value = DataSerializer.readObject(in);
+    Object key = InternalDataSerializer.readUserObject(in);
+    Object value = InternalDataSerializer.readUserObject(in);
     this.keyInfo = new KeyInfo(key, value, null);
     this.op = Operation.fromOrdinal(in.readByte());
     this.eventFlags = in.readShort();
-    this.keyInfo.setCallbackArg(DataSerializer.readObject(in));
+    this.keyInfo.setCallbackArg(InternalDataSerializer.readUserObject(in));
     this.txId = (TXId) DataSerializer.readObject(in);
 
     if (in.readBoolean()) { // isDelta
@@ -214,7 +214,7 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
       } else {
         this.newValueBytes = null;
         this.cachedSerializedNewValue = null;
-        this.newValue = DataSerializer.readObject(in);
+        this.newValue = InternalDataSerializer.readUserObject(in);
       }
     }
 
@@ -225,7 +225,7 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
       this.oldValue = null; // set later in basicGetOldValue
     } else {
       this.oldValueBytes = null;
-      this.oldValue = DataSerializer.readObject(in);
+      this.oldValue = InternalDataSerializer.readUserObject(in);
     }
     this.distributedMember = DSFIDFactory.readInternalDistributedMember(in);
     this.context = ClientProxyMembershipID.readCanonicalized(in);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FunctionStreamingReplyMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FunctionStreamingReplyMessage.java
index e6dcde6..01fe585 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/FunctionStreamingReplyMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FunctionStreamingReplyMessage.java
@@ -28,6 +28,7 @@ import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.ReplyMessage;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.logging.LogService;
 
 public class FunctionStreamingReplyMessage extends ReplyMessage {
@@ -87,7 +88,7 @@ public class FunctionStreamingReplyMessage extends ReplyMessage {
     this.lastMsg = in.readBoolean();
     this.processorId = in.readInt();
     try {
-      this.result = DataSerializer.readObject(in);
+      this.result = InternalDataSerializer.readUserObject(in);
     } catch (Exception e) { // bug fix 40670
       // Seems odd to throw a NonSerializableEx when it has already been
       // serialized and we are failing because we can't deserialize.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index c4b11b4..f6b35c1 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -240,7 +240,6 @@ import org.apache.geode.pdx.PdxSerializer;
 import org.apache.geode.pdx.ReflectionBasedAutoSerializer;
 import org.apache.geode.pdx.internal.AutoSerializableManager;
 import org.apache.geode.pdx.internal.PdxInstanceFactoryImpl;
-import org.apache.geode.pdx.internal.PdxInstanceImpl;
 import org.apache.geode.pdx.internal.TypeRegistry;
 import org.apache.geode.redis.GeodeRedisServer;
 
@@ -5066,7 +5065,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
     }
 
     return (getPdxReadSerialized() || pdxReadSerializedOverriden)
-        && PdxInstanceImpl.getPdxReadSerialized();
+        && TypeRegistry.getPdxReadSerialized();
   }
 
   @Override
@@ -5158,7 +5157,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
 
   @Override
   public void setReadSerializedForCurrentThread(boolean value) {
-    PdxInstanceImpl.setPdxReadSerialized(value);
+    TypeRegistry.setPdxReadSerialized(value);
     this.setPdxReadSerializedOverride(value);
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
index 068ca70..e0b9a4f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
@@ -3024,7 +3024,7 @@ public class InitialImageOperation {
     public void fromData(DataInput in) throws IOException, ClassNotFoundException {
       this.entryBits = in.readByte();
       byte flags = in.readByte();
-      this.key = DataSerializer.readObject(in);
+      this.key = InternalDataSerializer.readUserObject(in);
 
       if (EntryBits.isTombstone(this.entryBits)) {
         this.value = Token.TOMBSTONE;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidateOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidateOperation.java
index 11e7273..d085cdb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidateOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidateOperation.java
@@ -27,6 +27,7 @@ import org.apache.geode.cache.EntryNotFoundException;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.ConflationKey;
 import org.apache.geode.distributed.internal.DirectReplyProcessor;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
 import org.apache.geode.internal.logging.LogService;
@@ -128,7 +129,7 @@ public class InvalidateOperation extends DistributedCacheOperation {
     public void fromData(DataInput in) throws IOException, ClassNotFoundException {
       super.fromData(in);
       this.eventId = (EventID) DataSerializer.readObject(in);
-      this.key = DataSerializer.readObject(in);
+      this.key = InternalDataSerializer.readUserObject(in);
     }
 
     @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidatePartitionedRegionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidatePartitionedRegionMessage.java
index 99937d0..38df994 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidatePartitionedRegionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidatePartitionedRegionMessage.java
@@ -28,6 +28,7 @@ import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.query.QueryException;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.partitioned.PartitionMessage;
 
 /**
@@ -93,7 +94,7 @@ public class InvalidatePartitionedRegionMessage extends PartitionMessage {
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     super.fromData(in);
-    this.callbackArg = DataSerializer.readObject(in);
+    this.callbackArg = InternalDataSerializer.readUserObject(in);
   }
 
   /*
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java
index 12fe856..766c11d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java
@@ -25,6 +25,7 @@ import org.apache.geode.distributed.internal.MessageWithReply;
 import org.apache.geode.distributed.internal.PooledDistributionMessage;
 import org.apache.geode.distributed.internal.ReplyMessage;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.InternalStatisticsDisabledException;
 
 /**
@@ -80,7 +81,7 @@ public class LatestLastAccessTimeMessage<K> extends PooledDistributionMessage
     super.fromData(in);
     this.processorId = DataSerializer.readPrimitiveInt(in);
     this.regionName = DataSerializer.readString(in);
-    this.key = DataSerializer.readObject(in);
+    this.key = InternalDataSerializer.readUserObject(in);
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index b20c838..a1ae0a5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -158,6 +158,7 @@ import org.apache.geode.i18n.StringId;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.ClassLoadUtil;
 import org.apache.geode.internal.HeapDataOutputStream;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.NanoTimer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.AbstractRegionMap.ARMLockTestHook;
@@ -4532,7 +4533,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
                 .toLocalizedString(new Object[] {snapshotVersion, SNAPSHOT_VERSION}));
       }
       for (;;) {
-        Object key = DataSerializer.readObject(in);
+        Object key = InternalDataSerializer.readUserObject(in);
         if (key == null) {
           break;
         }
@@ -4540,7 +4541,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
         Object value;
 
         if (aByte == SNAPSHOT_VALUE_OBJ) {
-          value = DataSerializer.readObject(in);
+          value = InternalDataSerializer.readUserObject(in);
         } else if (aByte == SNAPSHOT_VALUE_INVALID || aByte == SNAPSHOT_VALUE_LOCAL_INVALID) {
           // Even though it was a distributed invalidate when the snapshot was created I think it is
           // correct to turn it into a local invalidate when we load the snapshot since we don't do
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java
index 1d96131..5deea73 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java
@@ -30,6 +30,7 @@ import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.ByteArrayDataInput;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.InitialImageOperation.Entry;
 import org.apache.geode.internal.cache.eviction.EvictionList;
@@ -153,8 +154,8 @@ public class NonLocalRegionEntry implements RegionEntry, VersionStamp {
   }
 
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    this.key = DataSerializer.readObject(in);
-    this.value = DataSerializer.readObject(in);
+    this.key = InternalDataSerializer.readUserObject(in);
+    this.value = InternalDataSerializer.readUserObject(in);
     this.lastModified = in.readLong();
     this.isRemoved = in.readBoolean();
     this.versionTag = (VersionTag) DataSerializer.readObject(in);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/QueuedOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/QueuedOperation.java
index 1c33cc8..4adffbd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/QueuedOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/QueuedOperation.java
@@ -19,6 +19,7 @@ import java.io.*;
 import org.apache.geode.*;
 import org.apache.geode.cache.*;
 import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.offheap.annotations.Released;
@@ -140,9 +141,9 @@ public class QueuedOperation {
     byte[] value = null;
     Object valueObj = null;
     byte deserializationPolicy = DistributedCacheOperation.DESERIALIZATION_POLICY_NONE;
-    Object cbArg = DataSerializer.readObject(in);
+    Object cbArg = InternalDataSerializer.readUserObject(in);
     if (op.isEntry()) {
-      key = DataSerializer.readObject(in);
+      key = InternalDataSerializer.readUserObject(in);
       if (op.isUpdate() || op.isCreate()) {
         deserializationPolicy = in.readByte();
         value = DataSerializer.readByteArray(in);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java
index b55c9f4..a36bc3a 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java
@@ -1460,7 +1460,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
         ReplyProcessor21.setMessageRPId(this.processorId);
       }
       this.regionName = in.readUTF();
-      this.key = DataSerializer.readObject(in);
+      this.key = InternalDataSerializer.readUserObject(in);
       this.timeoutMs = in.readInt();
       if ((flags & HAS_TTL) != 0) {
         this.ttl = (int) InternalDataSerializer.readSignedVL(in);
@@ -1696,9 +1696,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
     @Override
     public void fromData(DataInput in) throws IOException, ClassNotFoundException {
       super.fromData(in);
-      this.key = DataSerializer.readObject(in);
+      this.key = InternalDataSerializer.readUserObject(in);
       this.processorId = in.readInt();
-      this.result = DataSerializer.readObject(in);
+      this.result = InternalDataSerializer.readUserObject(in);
       this.lastModified = in.readLong();
       this.isPresent = in.readBoolean();
       this.isSerialized = in.readBoolean();
@@ -1815,7 +1815,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
         ReplyProcessor21.setMessageRPId(this.processorId);
       }
       this.regionName = in.readUTF();
-      this.key = DataSerializer.readObject(in);
+      this.key = InternalDataSerializer.readUserObject(in);
       this.timeoutMs = in.readInt();
       if ((flags & HAS_TTL) != 0) {
         this.ttl = (int) InternalDataSerializer.readSignedVL(in);
@@ -2187,8 +2187,8 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
       super.fromData(in);
       this.processorId = in.readInt();
       this.regionName = in.readUTF();
-      this.key = DataSerializer.readObject(in);
-      this.aCallbackArgument = DataSerializer.readObject(in);
+      this.key = InternalDataSerializer.readUserObject(in);
+      this.aCallbackArgument = InternalDataSerializer.readUserObject(in);
       this.timeoutMs = in.readInt();
       this.ttl = in.readInt();
       this.idleTime = in.readInt();
@@ -2366,7 +2366,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
       super.fromData(in);
       this.processorId = in.readInt();
       this.result = DataSerializer.readByteArray(in);
-      this.aCallbackArgument = DataSerializer.readObject(in);
+      this.aCallbackArgument = InternalDataSerializer.readUserObject(in);
       this.e = (Exception) DataSerializer.readObject(in);
       this.isSerialized = in.readBoolean();
       this.requestorTimedOut = in.readBoolean();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index 98b1f2e..635af30 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -1384,6 +1384,7 @@ public class TXCommitMessage extends PooledDistributionMessage
         for (int i = 0; i < size; i++) {
           FarSideEntryOp entryOp = new FarSideEntryOp();
           // shadowkey is not being sent to clients
+          // TODO: this fromData invocation is not backward-compatible
           entryOp.fromData(in, largeModCount, hasShadowKey(regionPath, parentRegionPath));
           if (entryOp.versionTag != null && this.memberId != null) {
             entryOp.versionTag.setMemberID(this.memberId);
@@ -1526,14 +1527,14 @@ public class TXCommitMessage extends PooledDistributionMessage
        */
       public void fromData(DataInput in, boolean largeModCount, boolean readShadowKey)
           throws IOException, ClassNotFoundException {
-        this.key = DataSerializer.readObject(in);
+        this.key = InternalDataSerializer.readUserObject(in);
         this.op = Operation.fromOrdinal(in.readByte());
         if (largeModCount) {
           this.modSerialNum = in.readInt();
         } else {
           this.modSerialNum = in.readByte();
         }
-        this.callbackArg = DataSerializer.readObject(in);
+        this.callbackArg = InternalDataSerializer.readUserObject(in);
         this.filterRoutingInfo = DataSerializer.readObject(in);
         this.versionTag = DataSerializer.readObject(in);
         if (readShadowKey) {
@@ -1549,8 +1550,10 @@ public class TXCommitMessage extends PooledDistributionMessage
               this.value = DataSerializer.readObject(in);
             } else {
               // CachedDeserializable, Object, or PDX
-              this.value = CachedDeserializableFactory.create(DataSerializer.readByteArray(in),
-                  GemFireCacheImpl.getInstance());
+              InternalDataSerializer.doWithPdxReadSerialized(() -> {
+                this.value = CachedDeserializableFactory.create(DataSerializer.readByteArray(in),
+                    GemFireCacheImpl.getInstance());
+              });
             }
           }
         }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionLockRequestImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionLockRequestImpl.java
index ca33352..5b5425d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionLockRequestImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionLockRequestImpl.java
@@ -128,7 +128,7 @@ public class TXRegionLockRequestImpl implements TXRegionLockRequest {
     final HashSet<Object> set = new HashSet<Object>(size);
     Object key;
     for (int i = 0; i < size; i++) {
-      key = DataSerializer.readObject(in);
+      key = InternalDataSerializer.readUserObject(in);
       set.add(key);
     }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateEntryVersionOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateEntryVersionOperation.java
index 1d3fdf4..ec6bfc0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateEntryVersionOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateEntryVersionOperation.java
@@ -28,6 +28,7 @@ import org.apache.geode.cache.EntryNotFoundException;
 import org.apache.geode.cache.TimeoutException;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DirectReplyProcessor;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
@@ -152,7 +153,7 @@ public class UpdateEntryVersionOperation extends DistributedCacheOperation {
     public void fromData(DataInput in) throws IOException, ClassNotFoundException {
       super.fromData(in);
       this.eventId = (EventID) DataSerializer.readObject(in);
-      this.key = DataSerializer.readObject(in);
+      this.key = InternalDataSerializer.readUserObject(in);
       Boolean hasTailKey = DataSerializer.readBoolean(in);
       if (hasTailKey.booleanValue()) {
         this.tailKey = DataSerializer.readLong(in);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateOperation.java
index a526919..8a58b02 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateOperation.java
@@ -359,7 +359,7 @@ public class UpdateOperation extends AbstractUpdateOperation {
       } else {
         this.eventId = null;
       }
-      this.key = DataSerializer.readObject(in);
+      this.key = InternalDataSerializer.readUserObject(in);
 
       this.deserializationPolicy = (byte) (extraFlags & DESERIALIZATION_POLICY_MASK);
       if (hasDelta()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/WrappedCallbackArgument.java b/geode-core/src/main/java/org/apache/geode/internal/cache/WrappedCallbackArgument.java
index 70b0788..1e326e5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/WrappedCallbackArgument.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/WrappedCallbackArgument.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.InternalDataSerializer;
 
 /**
  * Used to create subclasses that wrap another callback argument by having a reference to the
@@ -67,7 +68,7 @@ public abstract class WrappedCallbackArgument {
   }
 
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    this._originalCallbackArg = DataSerializer.readObject(in);
+    this._originalCallbackArg = InternalDataSerializer.readUserObject(in);
   }
 
   void setOriginalCallbackArgument(Object origCallbackArg) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionRemoteContext.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionRemoteContext.java
index 968aea2..45c2316 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionRemoteContext.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionRemoteContext.java
@@ -24,6 +24,7 @@ import org.apache.geode.DataSerializable;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.internal.InternalDataSerializer;
 
 /**
  * FunctionContext for remote/target nodes
@@ -70,7 +71,7 @@ public class FunctionRemoteContext implements DataSerializable {
       this.function = (Function) object;
       this.isFnSerializationReqd = true;
     }
-    this.args = DataSerializer.readObject(in);
+    this.args = InternalDataSerializer.readUserObject(in);
     this.filter = (HashSet) DataSerializer.readHashSet(in);
     this.bucketSet = (HashSet) DataSerializer.readHashSet(in);
     this.isReExecute = DataSerializer.readBoolean(in);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ContainsKeyValueMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ContainsKeyValueMessage.java
index 25560bb..3e4506c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ContainsKeyValueMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ContainsKeyValueMessage.java
@@ -35,6 +35,7 @@ import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.distributed.internal.ReplySender;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.PartitionedRegionDataStore;
@@ -157,7 +158,7 @@ public class ContainsKeyValueMessage extends PartitionMessageWithDirectReply {
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     super.fromData(in);
-    this.key = DataSerializer.readObject(in);
+    this.key = InternalDataSerializer.readUserObject(in);
     this.valueCheck = in.readBoolean();
     this.bucketId = Integer.valueOf(in.readInt());
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
index f4f238e..4aeaedd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
@@ -338,14 +338,14 @@ public class DestroyMessage extends PartitionMessageWithDirectReply {
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     super.fromData(in);
-    setKey(DataSerializer.readObject(in));
+    setKey(InternalDataSerializer.readUserObject(in));
     this.cbArg = DataSerializer.readObject(in);
     this.op = Operation.fromOrdinal(in.readByte());
     this.notificationOnly = in.readBoolean();
     this.bridgeContext = ClientProxyMembershipID.readCanonicalized(in);
     this.originalSender = (InternalDistributedMember) DataSerializer.readObject(in);
     this.eventId = (EventID) DataSerializer.readObject(in);
-    this.expectedOldValue = DataSerializer.readObject(in);
+    this.expectedOldValue = InternalDataSerializer.readUserObject(in);
 
     final boolean hasFilterInfo = ((flags & HAS_FILTER_INFO) != 0);
     if (hasFilterInfo) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyRegionOnDataStoreMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyRegionOnDataStoreMessage.java
index 64bc795..ca88b54 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyRegionOnDataStoreMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyRegionOnDataStoreMessage.java
@@ -24,6 +24,7 @@ import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.logging.log4j.LogMarker;
 
@@ -96,7 +97,7 @@ public class DestroyRegionOnDataStoreMessage extends PartitionMessage {
   @Override
   public void fromData(final DataInput in) throws IOException, ClassNotFoundException {
     super.fromData(in);
-    callbackArg = DataSerializer.readObject(in);
+    callbackArg = InternalDataSerializer.readUserObject(in);
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchBulkEntriesMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchBulkEntriesMessage.java
index 3604a3b..25f7fa9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchBulkEntriesMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchBulkEntriesMessage.java
@@ -44,6 +44,7 @@ import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.HeapDataOutputStream;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.BucketDump;
 import org.apache.geode.internal.cache.BucketRegion;
@@ -58,6 +59,7 @@ import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.pdx.internal.PdxInstanceImpl;
 
 /**
  *
@@ -153,11 +155,13 @@ public class FetchBulkEntriesMessage extends PartitionMessage {
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     super.fromData(in);
     this.keys = DataSerializer.readByte(in);
-    if (this.keys == KEY_LIST) {
-      this.bucketKeys = DataSerializer.readHashMap(in);
-    } else if (this.keys == ALL_KEYS) {
-      this.bucketIds = DataSerializer.readHashSet(in);
-    }
+    InternalDataSerializer.doWithPdxReadSerialized(() -> {
+      if (this.keys == KEY_LIST) {
+        this.bucketKeys = DataSerializer.readHashMap(in);
+      } else if (this.keys == ALL_KEYS) {
+        this.bucketIds = DataSerializer.readHashSet(in);
+      }
+    });
     this.regex = DataSerializer.readString(in);
     this.allowTombstones = DataSerializer.readPrimitiveBoolean(in);
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntryMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntryMessage.java
index ea08ef2..68bdf4d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntryMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntryMessage.java
@@ -180,7 +180,7 @@ public class FetchEntryMessage extends PartitionMessage {
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     super.fromData(in);
-    this.key = DataSerializer.readObject(in);
+    this.key = InternalDataSerializer.readUserObject(in);
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchKeysMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchKeysMessage.java
index bbb13cd..9d19864 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchKeysMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchKeysMessage.java
@@ -39,6 +39,7 @@ import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.HeapDataOutputStream;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.InitialImageOperation;
@@ -488,17 +489,19 @@ public class FetchKeysMessage extends PartitionMessage {
         try {
           ByteArrayInputStream byteStream = new ByteArrayInputStream(msg.chunk);
           DataInputStream in = new DataInputStream(byteStream);
-          while (in.available() > 0) {
-            Object key = DataSerializer.readObject(in);
-            if (key != null) {
-              synchronized (returnValue) {
-                returnValue.add(key);
+          InternalDataSerializer.doWithPdxReadSerialized(() -> {
+            while (in.available() > 0) {
+              Object key = DataSerializer.readObject(in);
+              if (key != null) {
+                synchronized (returnValue) {
+                  returnValue.add(key);
+                }
+              } else {
+                // null should signal the end of the set of keys
+                Assert.assertTrue(in.available() == 0);
               }
-            } else {
-              // null should signal the end of the set of keys
-              Assert.assertTrue(in.available() == 0);
             }
-          }
+          });
 
           synchronized (this.endLock) {
             chunksProcessed = chunksProcessed + 1;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/GetMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/GetMessage.java
index b13df25..65d56b8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/GetMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/GetMessage.java
@@ -241,8 +241,8 @@ public class GetMessage extends PartitionMessageWithDirectReply {
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     super.fromData(in);
-    this.key = DataSerializer.readObject(in);
-    this.cbArg = DataSerializer.readObject(in);
+    this.key = InternalDataSerializer.readUserObject(in);
+    this.cbArg = InternalDataSerializer.readUserObject(in);
     this.context = DataSerializer.readObject(in);
     this.returnTombstones = in.readBoolean();
   }
@@ -550,7 +550,8 @@ public class GetMessage extends PartitionMessageWithDirectReply {
                   return CachedDeserializableFactory.create(reply.valueInBytes,
                       getDistributionManager().getCache());
                 } else {
-                  return BlobHelper.deserializeBlob(reply.valueInBytes, reply.remoteVersion, null);
+                  return BlobHelper.deserializeBlob(reply.valueInBytes, reply.remoteVersion, null,
+                      true);
                 }
               } else {
                 return null;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRTombstoneMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRTombstoneMessage.java
index 807ab37..4f964da 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRTombstoneMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRTombstoneMessage.java
@@ -31,6 +31,7 @@ import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ReplyMessage;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.SerializationVersions;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.BucketRegion;
@@ -133,7 +134,7 @@ public class PRTombstoneMessage extends PartitionMessageWithDirectReply
     int numKeys = in.readInt();
     this.keys = new HashSet<Object>(numKeys);
     for (int i = 0; i < numKeys; i++) {
-      this.keys.add(DataSerializer.readObject(in));
+      this.keys.add(InternalDataSerializer.readUserObject(in));
     }
     this.eventID = (EventID) DataSerializer.readObject(in);
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRUpdateEntryVersionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRUpdateEntryVersionMessage.java
index e7cbba7..9e24c83 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRUpdateEntryVersionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRUpdateEntryVersionMessage.java
@@ -36,6 +36,7 @@ import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.ReplyMessage;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.DataLocationException;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EventID;
@@ -181,7 +182,7 @@ public class PRUpdateEntryVersionMessage extends PartitionMessageWithDirectReply
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     super.fromData(in);
-    this.key = DataSerializer.readObject(in);
+    this.key = InternalDataSerializer.readUserObject(in);
     this.op = Operation.fromOrdinal(in.readByte());
     this.eventId = (EventID) DataSerializer.readObject(in);
     this.versionTag = DataSerializer.readObject(in);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
index 5295954..2d3233f 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
@@ -221,7 +221,7 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply {
     if ((flags & HAS_BRIDGE_CONTEXT) != 0) {
       this.bridgeContext = DataSerializer.readObject(in);
     }
-    this.callbackArg = DataSerializer.readObject(in);
+    this.callbackArg = InternalDataSerializer.readUserObject(in);
     this.putAllPRDataSize = (int) InternalDataSerializer.readUnsignedVL(in);
     this.putAllPRData = new PutAllEntryData[putAllPRDataSize];
     if (this.putAllPRDataSize > 0) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutMessage.java
index 1bef55b..cf88ba9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutMessage.java
@@ -514,7 +514,7 @@ public class PutMessage extends PartitionMessageWithDirectReply implements NewVa
     super.fromData(in);
 
     final int extraFlags = in.readUnsignedByte();
-    setKey(DataSerializer.readObject(in));
+    setKey(InternalDataSerializer.readUserObject(in));
     this.cbArg = DataSerializer.readObject(in);
     this.lastModified = in.readLong();
     this.op = Operation.fromOrdinal(in.readByte());
@@ -528,7 +528,7 @@ public class PutMessage extends PartitionMessageWithDirectReply implements NewVa
     InternalDataSerializer.invokeFromData(this.eventId, in);
 
     if ((flags & HAS_EXPECTED_OLD_VAL) != 0) {
-      this.expectedOldValue = DataSerializer.readObject(in);
+      this.expectedOldValue = InternalDataSerializer.readUserObject(in);
     }
     /*
      * this.hasOldValue = in.readBoolean(); if (this.hasOldValue){
@@ -989,7 +989,7 @@ public class PutMessage extends PartitionMessageWithDirectReply implements NewVa
       super.fromData(in);
       this.result = in.readBoolean();
       this.op = Operation.fromOrdinal(in.readByte());
-      this.oldValue = DataSerializer.readObject(in);
+      this.oldValue = InternalDataSerializer.readUserObject(in);
       this.versionTag = (VersionTag) DataSerializer.readObject(in);
     }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
index 1ab675e..1e9f6e8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
@@ -44,6 +44,7 @@ import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.distributed.internal.streaming.StreamingOperation.StreamingReplyMessage;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.NanoTimer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.ForceReattemptException;
@@ -299,7 +300,9 @@ public class QueryMessage extends StreamingPartitionOperation.StreamingPartition
     super.fromData(in);
     this.queryString = DataSerializer.readString(in);
     this.buckets = DataSerializer.readArrayList(in);
-    this.parameters = DataSerializer.readObjectArray(in);
+    InternalDataSerializer.doWithPdxReadSerialized(() -> {
+      this.parameters = DataSerializer.readObjectArray(in);
+    });
     this.cqQuery = DataSerializer.readBoolean(in);
     this.isPdxSerialized = DataSerializer.readBoolean(in);
     this.traceOn = DataSerializer.readBoolean(in);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
index a90eee7..868247e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
@@ -216,7 +216,7 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
       this.bridgeContext = DataSerializer.readObject(in);
     }
     Version sourceVersion = InternalDataSerializer.getVersionForDataStream(in);
-    this.callbackArg = DataSerializer.readObject(in);
+    this.callbackArg = InternalDataSerializer.readUserObject(in);
     this.removeAllPRDataSize = (int) InternalDataSerializer.readUnsignedVL(in);
     this.removeAllPRData = new RemoveAllEntryData[removeAllPRDataSize];
     if (this.removeAllPRDataSize > 0) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientInterestMessageImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientInterestMessageImpl.java
index f1b82d1..aab5c5e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientInterestMessageImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientInterestMessageImpl.java
@@ -15,10 +15,13 @@
 
 package org.apache.geode.internal.cache.tier.sockets;
 
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 
 import org.apache.geode.DataSerializer;
-import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.internal.DataSerializableFixedID;
 import org.apache.geode.internal.Version;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
index 11ad04f..1440f57 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
@@ -1269,7 +1269,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     // if (this._hasCqs) {
     // this._clientCqs = DataSerializer.readHashMap(in);
     // }
-    this._callbackArgument = DataSerializer.readObject(in);
+    this._callbackArgument = InternalDataSerializer.readUserObject(in);
 
     CacheClientNotifier ccn = CacheClientNotifier.getInstance();
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ObjectPartList.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ObjectPartList.java
index e604ec6..fdeeec0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ObjectPartList.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ObjectPartList.java
@@ -26,6 +26,7 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.internal.logging.LogService;
@@ -224,7 +225,7 @@ public class ObjectPartList implements DataSerializableFixedID, Releasable {
     if (numObjects > 0) {
       for (int index = 0; index < numObjects; ++index) {
         if (this.hasKeys) {
-          Object key = DataSerializer.readObject(in);
+          Object key = InternalDataSerializer.readUserObject(in);
           this.keys.add(key);
         }
         boolean isException = in.readBoolean();
@@ -235,7 +236,7 @@ public class ObjectPartList implements DataSerializableFixedID, Releasable {
           // ignore the exception string meant for native clients
           DataSerializer.readString(in);
         } else {
-          value = DataSerializer.readObject(in);
+          value = InternalDataSerializer.readUserObject(in);
         }
         this.objects.add(value);
       }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ObjectPartList651.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ObjectPartList651.java
index 887baf0..2ca11d9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ObjectPartList651.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ObjectPartList651.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.InternalDataSerializer;
 
 /**
  * Adds one more object type which indicates that the key is not present at the server.
@@ -130,7 +131,7 @@ public class ObjectPartList651 extends ObjectPartList {
     if (numObjects > 0) {
       for (int index = 0; index < numObjects; ++index) {
         if (keysPresent) {
-          Object key = DataSerializer.readObject(in);
+          Object key = InternalDataSerializer.readUserObject(in);
           this.keys.add(key);
         }
         byte objectType = in.readByte();
@@ -142,7 +143,7 @@ public class ObjectPartList651 extends ObjectPartList {
           // ignore the exception string meant for native clients
           DataSerializer.readString(in);
         } else {
-          value = DataSerializer.readObject(in);
+          value = InternalDataSerializer.readUserObject(in);
         }
         this.objects.add(value);
       }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/VersionedObjectList.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/VersionedObjectList.java
index 0dc4cb7..2ea9c6f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/VersionedObjectList.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/VersionedObjectList.java
@@ -463,7 +463,7 @@ public class VersionedObjectList extends ObjectPartList implements Externalizabl
         logger.trace(LogMarker.VERSIONED_OBJECT_LIST, "reading {} keys", size);
       }
       for (int i = 0; i < size; i++) {
-        this.keys.add(DataSerializer.readObject(in));
+        this.keys.add(InternalDataSerializer.readUserObject(in));
       }
     }
     if (hasObjects) {
@@ -557,7 +557,7 @@ public class VersionedObjectList extends ObjectPartList implements Externalizabl
     } else if (this.serializeValues) {
       value = DataSerializer.readByteArray(in);
     } else {
-      value = DataSerializer.readObject(in);
+      value = InternalDataSerializer.readUserObject(in);
     }
     this.objects.add(value);
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistTxEntryEvent.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistTxEntryEvent.java
index a4ff33e..f493ba6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistTxEntryEvent.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistTxEntryEvent.java
@@ -101,14 +101,14 @@ public class DistTxEntryEvent extends EntryEventImpl {
     this.eventID = (EventID) DataSerializer.readObject(in);
     this.regionName = DataSerializer.readString(in);
     this.op = Operation.fromOrdinal(in.readByte());
-    Object key = DataSerializer.readObject(in);
+    Object key = InternalDataSerializer.readUserObject(in);
     Integer bucketId = DataSerializer.readInteger(in);
     this.keyInfo = new DistTxKeyInfo(key, null/*
                                                * value [DISTTX} TODO see if required
                                                */, null/*
                                                        * callbackarg [DISTTX] TODO
                                                        */, bucketId);
-    basicSetNewValue(DataSerializer.readObject(in), true);
+    basicSetNewValue(InternalDataSerializer.readUserObject(in), true);
 
     byte flags = DataSerializer.readByte(in);
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteContainsKeyValueMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteContainsKeyValueMessage.java
index 69fefc8..c6afc84 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteContainsKeyValueMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteContainsKeyValueMessage.java
@@ -34,6 +34,7 @@ import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.distributed.internal.ReplySender;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.RemoteOperationException;
@@ -140,7 +141,7 @@ public class RemoteContainsKeyValueMessage extends RemoteOperationMessageWithDir
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     super.fromData(in);
-    this.key = DataSerializer.readObject(in);
+    this.key = InternalDataSerializer.readUserObject(in);
     this.valueCheck = (flags & VALUE_CHECK) != 0;
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteDestroyMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteDestroyMessage.java
index c43aa0a..beca12e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteDestroyMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteDestroyMessage.java
@@ -404,8 +404,8 @@ public class RemoteDestroyMessage extends RemoteOperationMessageWithDirectReply
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     super.fromData(in);
-    setKey(DataSerializer.readObject(in));
-    this.cbArg = DataSerializer.readObject(in);
+    setKey(InternalDataSerializer.readUserObject(in));
+    this.cbArg = InternalDataSerializer.readUserObject(in);
     this.op = Operation.fromOrdinal(in.readByte());
     if ((flags & HAS_BRIDGE_CONTEXT) != 0) {
       this.bridgeContext = DataSerializer.readObject(in);
@@ -422,7 +422,7 @@ public class RemoteDestroyMessage extends RemoteOperationMessageWithDirectReply
       in.readByte();
       setOldValBytes(DataSerializer.readByteArray(in));
     }
-    this.expectedOldValue = DataSerializer.readObject(in);
+    this.expectedOldValue = InternalDataSerializer.readUserObject(in);
     // to prevent bug 51024 always call readObject for versionTag
     // since toData always calls writeObject for versionTag.
     this.versionTag = DataSerializer.readObject(in);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteFetchEntryMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteFetchEntryMessage.java
index 503f2a3..3922e7d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteFetchEntryMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteFetchEntryMessage.java
@@ -135,7 +135,7 @@ public class RemoteFetchEntryMessage extends RemoteOperationMessage {
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     super.fromData(in);
-    this.key = DataSerializer.readObject(in);
+    this.key = InternalDataSerializer.readUserObject(in);
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteFetchKeysMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteFetchKeysMessage.java
index 3de0dc3..ab903be 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteFetchKeysMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteFetchKeysMessage.java
@@ -44,6 +44,7 @@ import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.HeapDataOutputStream;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.InitialImageOperation;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
@@ -52,6 +53,7 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LogMarker;
 import org.apache.geode.internal.util.ObjectIntProcedure;
+import org.apache.geode.pdx.internal.PdxInstanceImpl;
 
 public class RemoteFetchKeysMessage extends RemoteOperationMessage {
 
@@ -389,17 +391,19 @@ public class RemoteFetchKeysMessage extends RemoteOperationMessage {
 
         ByteArrayInputStream byteStream = new ByteArrayInputStream(msg.chunk);
         DataInputStream in = new DataInputStream(byteStream);
-        while (in.available() > 0) {
-          Object key = DataSerializer.readObject(in);
-          if (key != null) {
-            synchronized (returnValue) {
-              returnValue.add(key);
+        InternalDataSerializer.doWithPdxReadSerialized(() -> {
+          while (in.available() > 0) {
+            Object key = DataSerializer.readObject(in);
+            if (key != null) {
+              synchronized (returnValue) {
+                returnValue.add(key);
+              }
+            } else {
+              // null should signal the end of the set of keys
+              Assert.assertTrue(in.available() == 0);
             }
-          } else {
-            // null should signal the end of the set of keys
-            Assert.assertTrue(in.available() == 0);
           }
-        }
+        });
 
         synchronized (this.endLock) {
           chunksProcessed = chunksProcessed + 1;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteFetchVersionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteFetchVersionMessage.java
index c6f866e..67625ec 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteFetchVersionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteFetchVersionMessage.java
@@ -33,6 +33,7 @@ import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.ReplyMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.NanoTimer;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.RegionEntry;
@@ -95,7 +96,7 @@ public class RemoteFetchVersionMessage extends RemoteOperationMessage {
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     super.fromData(in);
-    this.key = DataSerializer.readObject(in);
+    this.key = InternalDataSerializer.readUserObject(in);
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteGetMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteGetMessage.java
index 76fb378..92b0388 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteGetMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteGetMessage.java
@@ -53,6 +53,7 @@ import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LogMarker;
 import org.apache.geode.internal.offheap.OffHeapHelper;
 import org.apache.geode.internal.util.BlobHelper;
+import org.apache.geode.pdx.internal.PdxInstanceImpl;
 
 /**
  * This message is used as the request for a get operation done in a transaction that is hosted on a
@@ -146,8 +147,8 @@ public class RemoteGetMessage extends RemoteOperationMessageWithDirectReply {
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     super.fromData(in);
-    this.key = DataSerializer.readObject(in);
-    this.cbArg = DataSerializer.readObject(in);
+    this.key = InternalDataSerializer.readUserObject(in);
+    this.cbArg = InternalDataSerializer.readUserObject(in);
     this.context = DataSerializer.readObject(in);
   }
 
@@ -366,7 +367,7 @@ public class RemoteGetMessage extends RemoteOperationMessageWithDirectReply {
             return CachedDeserializableFactory.create(reply.valueInBytes,
                 getDistributionManager().getCache());
           } else {
-            return BlobHelper.deserializeBlob(reply.valueInBytes, reply.remoteVersion, null);
+            return BlobHelper.deserializeBlob(reply.valueInBytes, reply.remoteVersion, null, true);
           }
         }
         return null;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutMessage.java
index 9e4083b..0e85868 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutMessage.java
@@ -448,12 +448,12 @@ public class RemotePutMessage extends RemoteOperationMessageWithDirectReply
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     super.fromData(in);
-    setKey(DataSerializer.readObject(in));
+    setKey(InternalDataSerializer.readUserObject(in));
 
     final int extraFlags = in.readUnsignedByte();
     this.deserializationPolicy =
         (byte) (extraFlags & DistributedCacheOperation.DESERIALIZATION_POLICY_MASK);
-    this.cbArg = DataSerializer.readObject(in);
+    this.cbArg = InternalDataSerializer.readUserObject(in);
     this.lastModified = in.readLong();
     this.op = Operation.fromOrdinal(in.readByte());
     if ((extraFlags & HAS_BRIDGE_CONTEXT) != 0) {
@@ -466,7 +466,7 @@ public class RemotePutMessage extends RemoteOperationMessageWithDirectReply
     InternalDataSerializer.invokeFromData(this.eventId, in);
 
     if ((flags & HAS_EXPECTED_OLD_VAL) != 0) {
-      this.expectedOldValue = DataSerializer.readObject(in);
+      this.expectedOldValue = InternalDataSerializer.readUserObject(in);
     }
 
     if (this.hasOldValue) {
@@ -838,7 +838,7 @@ public class RemotePutMessage extends RemoteOperationMessageWithDirectReply
       byte flags = (byte) (in.readByte() & 0xff);
       this.result = (flags & FLAG_RESULT) != 0;
       this.op = Operation.fromOrdinal(in.readByte());
-      this.oldValue = DataSerializer.readObject(in);
+      this.oldValue = InternalDataSerializer.readUserObject(in);
       if ((flags & FLAG_HASVERSION) != 0) {
         boolean persistentTag = (flags & FLAG_PERSISTENT) != 0;
         this.versionTag = VersionTag.create(persistentTag, in);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index a391238..ae35334 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -719,7 +719,8 @@ public class GatewaySenderEventImpl
     this.valueIsObject = in.readByte();
     deserializeKey(in);
     this.value = DataSerializer.readByteArray(in);
-    this.callbackArgument = (GatewaySenderEventCallbackArgument) DataSerializer.readObject(in);
+    this.callbackArgument =
+        (GatewaySenderEventCallbackArgument) InternalDataSerializer.readUserObject(in);
     this.possibleDuplicate = in.readBoolean();
     this.creationTime = in.readLong();
     this.bucketId = in.readInt();
@@ -729,7 +730,7 @@ public class GatewaySenderEventImpl
   }
 
   protected void deserializeKey(DataInput in) throws IOException, ClassNotFoundException {
-    this.key = DataSerializer.readObject(in);
+    this.key = InternalDataSerializer.readUserObject(in);
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java
index 2d6f118..e5847f0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java
@@ -40,6 +40,7 @@ import org.apache.geode.distributed.internal.ReplyMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InitialImageOperation;
@@ -301,7 +302,7 @@ public class GatewaySenderQueueEntrySynchronizationOperation {
 
     @Override
     public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-      this.key = DataSerializer.readObject(in);
+      this.key = InternalDataSerializer.readUserObject(in);
       this.entryVersion = DataSerializer.readObject(in);
     }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
index debb005..8d59933 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
@@ -30,6 +30,7 @@ import org.apache.geode.cache.wan.GatewayQueueEvent;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.ConflationKey;
 import org.apache.geode.distributed.internal.DirectReplyProcessor;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.DistributedCacheOperation;
 import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.EntryEventImpl;
@@ -182,7 +183,7 @@ public class BatchDestroyOperation extends DistributedCacheOperation {
     public void fromData(DataInput in) throws IOException, ClassNotFoundException {
       super.fromData(in);
       this.eventId = (EventID) DataSerializer.readObject(in);
-      this.key = DataSerializer.readObject(in);
+      this.key = InternalDataSerializer.readUserObject(in);
       Boolean hasTailKey = DataSerializer.readBoolean(in);
       if (hasTailKey.booleanValue()) {
         this.tailKey = DataSerializer.readLong(in);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/BlobHelper.java b/geode-core/src/main/java/org/apache/geode/internal/util/BlobHelper.java
index 46a1f67..6a03741 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/util/BlobHelper.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/util/BlobHelper.java
@@ -26,6 +26,7 @@ import org.apache.geode.internal.Version;
 import org.apache.geode.internal.offheap.StoredObject;
 import org.apache.geode.internal.offheap.annotations.Unretained;
 import org.apache.geode.pdx.internal.PdxInputStream;
+import org.apache.geode.pdx.internal.TypeRegistry;
 
 /**
  * A "blob" is a serialized representation of an object into a byte[]. BlobHelper provides utility
@@ -77,6 +78,21 @@ public class BlobHelper {
   /**
    * A blob is a serialized Object. This method returns the deserialized object.
    */
+  public static Object deserializeBlob(byte[] blob, Version version, ByteArrayDataInput in,
+      boolean allowPdxReadSerialized) throws IOException, ClassNotFoundException {
+    boolean readPdx = TypeRegistry.getPdxReadSerialized();
+    if (readPdx != allowPdxReadSerialized) {
+      TypeRegistry.setPdxReadSerialized(allowPdxReadSerialized);
+    }
+    try {
+      return deserializeBlob(blob, version, in);
+    } finally {
+      if (readPdx != allowPdxReadSerialized) {
+        TypeRegistry.setPdxReadSerialized(readPdx);
+      }
+    }
+  }
+
   public static Object deserializeBlob(byte[] blob, Version version, ByteArrayDataInput in)
       throws IOException, ClassNotFoundException {
     Object result;
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CliFunctionResult.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CliFunctionResult.java
index bc360bb..33da231 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CliFunctionResult.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CliFunctionResult.java
@@ -25,8 +25,10 @@ import java.util.List;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.management.internal.configuration.domain.XmlEntity;
+import org.apache.geode.pdx.internal.PdxInstanceImpl;
 
 public class CliFunctionResult implements Comparable<CliFunctionResult>, DataSerializableFixedID {
   private String memberIdOrName;
@@ -164,7 +166,9 @@ public class CliFunctionResult implements Comparable<CliFunctionResult>, DataSer
     this.memberIdOrName = DataSerializer.readString(in);
     this.successful = DataSerializer.readPrimitiveBoolean(in);
     this.xmlEntity = DataSerializer.readObject(in);
-    this.serializables = (Serializable[]) DataSerializer.readObjectArray(in);
+    InternalDataSerializer.doWithPdxReadSerialized(() -> {
+      this.serializables = (Serializable[]) DataSerializer.readObjectArray(in);
+    });
     this.throwable = DataSerializer.readObject(in);
     this.byteData = DataSerializer.readByteArray(in);
   }
@@ -172,7 +176,9 @@ public class CliFunctionResult implements Comparable<CliFunctionResult>, DataSer
   public void fromDataPre_GFE_8_0_0_0(DataInput in) throws IOException, ClassNotFoundException {
     this.memberIdOrName = DataSerializer.readString(in);
     this.throwable = DataSerializer.readObject(in);
-    this.serializables = (Serializable[]) DataSerializer.readObjectArray(in);
+    InternalDataSerializer.doWithPdxReadSerialized(() -> {
+      this.serializables = (Serializable[]) DataSerializer.readObjectArray(in);
+    });
   }
 
   public boolean isSuccessful() {
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxInstanceImpl.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxInstanceImpl.java
index f55df95..c7166a3 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxInstanceImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxInstanceImpl.java
@@ -121,18 +121,6 @@ public class PdxInstanceImpl extends PdxReaderImpl implements InternalPdxInstanc
     return dis;
   }
 
-  public static boolean getPdxReadSerialized() {
-    return pdxGetObjectInProgress.get() == null;
-  }
-
-  public static void setPdxReadSerialized(boolean readSerialized) {
-    if (!readSerialized) {
-      pdxGetObjectInProgress.set(true);
-    } else {
-      pdxGetObjectInProgress.remove();
-    }
-  }
-
   @Override
   public Object getField(String fieldName) {
     return getUnmodifiableReader(fieldName).readField(fieldName);
@@ -234,15 +222,16 @@ public class PdxInstanceImpl extends PdxReaderImpl implements InternalPdxInstanc
       }
       return this;
     }
-    boolean wouldReadSerialized = PdxInstanceImpl.getPdxReadSerialized();
+
+    boolean wouldReadSerialized = TypeRegistry.getPdxReadSerialized();
     if (!wouldReadSerialized) {
       return getUnmodifiableReader().basicGetObject();
     } else {
-      PdxInstanceImpl.setPdxReadSerialized(false);
+      TypeRegistry.setPdxReadSerialized(false);
       try {
         return getUnmodifiableReader().basicGetObject();
       } finally {
-        PdxInstanceImpl.setPdxReadSerialized(true);
+        TypeRegistry.setPdxReadSerialized(true);
       }
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/TypeRegistry.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/TypeRegistry.java
index 2479d29..76aed3e 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/TypeRegistry.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/TypeRegistry.java
@@ -44,6 +44,20 @@ public class TypeRegistry {
   private static final boolean DISABLE_TYPE_REGISTRY =
       Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "TypeRegistry.DISABLE_PDX_REGISTRY");
 
+  /**
+   * setting this to true disables DataSerializer from returning a PdxInstance if
+   * the instance can be deserialized into a POJO
+   */
+  private static final ThreadLocal<Boolean> disablePdxReadSerialized = new ThreadLocal<Boolean>();
+
+  /**
+   * setting this to true enables pdx-read-serialized for the current thread, regardless
+   * of the cache's setting for this attribute
+   */
+  private final ThreadLocal<Boolean> pdxReadSerializedOverride =
+      ThreadLocal.withInitial(() -> Boolean.FALSE);
+
+
   private final Map<Integer, PdxType> idToType = new CopyOnWriteHashMap<>();
 
   private final Map<PdxType, Integer> typeToId = new CopyOnWriteHashMap<>();
@@ -66,8 +80,6 @@ public class TypeRegistry {
 
   private final InternalCache cache;
 
-  private final ThreadLocal<Boolean> pdxReadSerializedOverride =
-      ThreadLocal.withInitial(() -> Boolean.FALSE);
 
   public TypeRegistry(InternalCache cache, boolean disableTypeRegistry) {
     this.cache = cache;
@@ -83,6 +95,24 @@ public class TypeRegistry {
     }
   }
 
+  public static boolean getPdxReadSerialized() {
+    return disablePdxReadSerialized.get() == null;
+  }
+
+  /**
+   * Setting this to true causes pdx-read-serialized to be respected, which is the
+   * default. Setting this to false disables pdx-read-serialized while deserializing
+   * objects. This takes precendence over setPdxReadSerializedOverride, which affects
+   * the cache's setting of that attribute.
+   */
+  public static void setPdxReadSerialized(boolean readSerialized) {
+    if (!readSerialized) {
+      disablePdxReadSerialized.set(true);
+    } else {
+      disablePdxReadSerialized.remove();
+    }
+  }
+
   /*
    * Test Hook to clear the type registry
    */
@@ -317,6 +347,7 @@ public class TypeRegistry {
    */
   private static volatile boolean pdxSerializerWasSet = false;
 
+
   public static void init() {
     pdxSerializerWasSet = false;
   }
@@ -545,10 +576,17 @@ public class TypeRegistry {
     return result;
   }
 
+  /**
+   * should the cache's setting for pdx-read-serialized be ignored and PdxInstances
+   * be returned?
+   */
   public Boolean getPdxReadSerializedOverride() {
     return pdxReadSerializedOverride.get();
   }
 
+  /**
+   * enable pdx-read-serialized, ignoring the cache's setting.
+   */
   public void setPdxReadSerializedOverride(boolean overridePdxReadSerialized) {
     pdxReadSerializedOverride.set(overridePdxReadSerialized);
   }
diff --git a/geode-core/src/test/java/org/apache/geode/codeAnalysis/ClassAndMethodDetails.java b/geode-core/src/test/java/org/apache/geode/codeAnalysis/ClassAndMethodDetails.java
index d0e0d66..9916384 100644
--- a/geode-core/src/test/java/org/apache/geode/codeAnalysis/ClassAndMethodDetails.java
+++ b/geode-core/src/test/java/org/apache/geode/codeAnalysis/ClassAndMethodDetails.java
@@ -95,11 +95,7 @@ public class ClassAndMethodDetails implements Comparable {
     for (CompiledMethod method : methods) {
       CompiledCode c = method.getCode();
       if (c != null) {
-        sb.append(method.name()).append(',').append(c.code.length).append(',');
-        for (int i = 0; i < c.code.length; i++) {
-          sb.append(hexChars[(c.code[i] & 0xff)]);
-        }
-        sb.append("\n");
+        sb.append(method.name()).append(',').append(c.code.length).append("\n");
       }
     }
     return sb.toString();
diff --git a/geode-core/src/test/java/org/apache/geode/internal/InternalDataSerializerSerializationWhitelistTest.java b/geode-core/src/test/java/org/apache/geode/internal/InternalDataSerializerSerializationWhitelistTest.java
index c521eab..1b3f487 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/InternalDataSerializerSerializationWhitelistTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/InternalDataSerializerSerializationWhitelistTest.java
@@ -21,6 +21,7 @@ import org.apache.geode.DataSerializer;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionConfigImpl;
 import org.apache.geode.internal.lang.ClassUtils;
+import org.apache.geode.pdx.internal.TypeRegistry;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 /*
@@ -45,6 +46,7 @@ public class InternalDataSerializerSerializationWhitelistTest {
 
   @Before
   public void setUp() {
+    TypeRegistry.init();
     Assume.assumeTrue("ObjectInputFilter is present in this JVM (post- 8.111)",
         hasObjectInputFilter());
     outputStream = new HeapDataOutputStream(Version.CURRENT);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventSerializationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventSerializationTest.java
index 50ab538..8027611 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventSerializationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventSerializationTest.java
@@ -39,6 +39,7 @@ import org.mockito.ArgumentCaptor;
 import org.apache.geode.SerializationException;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.pdx.internal.PdxInstanceImpl;
+import org.apache.geode.pdx.internal.TypeRegistry;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
@@ -54,6 +55,8 @@ public class EntryEventSerializationTest {
 
   @Before
   public void setUp() {
+    TypeRegistry.init();
+
     System.setProperty(GEODE_PREFIX + EARLY_ENTRY_EVENT_SERIALIZATION, "true");
 
     region = mock(InternalRegion.class);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionRegionConfigTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionRegionConfigTest.java
index 0632b71..b580a56 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionRegionConfigTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionRegionConfigTest.java
@@ -31,6 +31,7 @@ import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.internal.util.BlobHelper;
+import org.apache.geode.pdx.internal.TypeRegistry;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
@@ -49,6 +50,7 @@ public class PartitionRegionConfigTest {
 
   @Before
   public void setUp() {
+    TypeRegistry.init();
     prId = 0;
     path = null;
     partitionAttributes = new PartitionAttributesFactory().create();
diff --git a/geode-core/src/test/java/org/apache/geode/pdx/AutoSerializableJUnitTest.java b/geode-core/src/test/java/org/apache/geode/pdx/AutoSerializableJUnitTest.java
index 133de3a..79c22c9 100644
--- a/geode-core/src/test/java/org/apache/geode/pdx/AutoSerializableJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/pdx/AutoSerializableJUnitTest.java
@@ -27,7 +27,6 @@ import java.net.URLClassLoader;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.jmock.auto.Auto;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -36,7 +35,6 @@ import org.junit.experimental.categories.Category;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.SerializationException;
 import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.PdxSerializerObject;
 import org.apache.geode.internal.Version;
@@ -44,6 +42,7 @@ import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.pdx.internal.AutoSerializableManager;
 import org.apache.geode.pdx.internal.PdxField;
 import org.apache.geode.pdx.internal.PdxInstanceImpl;
+import org.apache.geode.pdx.internal.TypeRegistry;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 import org.apache.geode.test.junit.categories.SerializationTest;
 
@@ -143,7 +142,7 @@ public class AutoSerializableJUnitTest {
     }
 
     // disable pdx instances to make sure we can deserialize without calling PdxInstance.getObject
-    PdxInstanceImpl.setPdxReadSerialized(false);
+    TypeRegistry.setPdxReadSerialized(false);
     try {
       DomainObjectPdxAuto result = (DomainObjectPdxAuto) DataSerializer
           .readObject(new DataInputStream(new ByteArrayInputStream(out.toByteArray())));
@@ -153,7 +152,7 @@ public class AutoSerializableJUnitTest {
       assertEquals(DomainObjectPdxAuto.Day.FRIDAY, result.get("anEnum"));
       assertEquals(4, ((List) result.get("string_list")).size());
     } finally {
-      PdxInstanceImpl.setPdxReadSerialized(true);
+      TypeRegistry.setPdxReadSerialized(true);
     }
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/pdx/PdxSerializerJUnitTest.java b/geode-core/src/test/java/org/apache/geode/pdx/PdxSerializerJUnitTest.java
new file mode 100644
index 0000000..6f13f73
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/pdx/PdxSerializerJUnitTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.pdx;
+
+import static org.apache.geode.pdx.PdxSerializerRegressionTest.TestSerializer;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.HeapDataOutputStream;
+import org.apache.geode.internal.InternalDataSerializer;
+import org.apache.geode.internal.Version;
+import org.apache.geode.test.junit.categories.SerializationTest;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category({UnitTest.class, SerializationTest.class})
+public class PdxSerializerJUnitTest {
+
+  Cache cache;
+
+  @Before
+  public void setup() {
+    // create a cache so we can work with PDX
+    cache = (new CacheFactory()).setPdxSerializer(new TestSerializer()).setPdxReadSerialized(true)
+        .set("mcast-port", "0").create();
+  }
+
+  @After
+  public void teardown() {
+    cache.close();
+    cache = null;
+  }
+
+  /**
+   * When deserializing DataSerializableFixedID objects DataSerializer disables
+   * pdx-read-serialized to protect Geode message classes from inadvertently reading
+   * PdxInstances instead of expected deserialized values in their fromData methods.
+   * This test ensures that readObject() fully deserializes a Set that is pdx-serialized
+   * and that other methods that respect the pdx-read-serialized setting return
+   * PdxInstances.
+   */
+  @Test
+  public void deserializingDSFIDDoesNotReturnPdxInstance()
+      throws IOException, ClassNotFoundException {
+
+    HeapDataOutputStream hdos = new HeapDataOutputStream(200, Version.CURRENT);
+    TestDSFID testObject = new TestDSFID();
+    Set setOfInts = new HashSet();
+    setOfInts.add(1);
+    setOfInts.add(2);
+    // The TestSerializer will PDX-serialize a Collections.UnmodifiableSet
+    testObject.mySet = Collections.unmodifiableSet(setOfInts);
+    DataSerializer.writeObject(testObject, hdos);
+    byte[] serializedForm = hdos.toByteArray();
+    ByteArrayInputStream bytesIn = new ByteArrayInputStream(serializedForm);
+    DataInputStream inputStream = new DataInputStream(bytesIn);
+
+    testObject = DataSerializer.readObject(inputStream);
+    assertThat(testObject.mySet.getClass().getName())
+        .isEqualTo("java.util.Collections$UnmodifiableSet");
+    assertThat(testObject.readUserObjectResult).isInstanceOf(PdxInstance.class);
+    assertThat(testObject.doWithPdxSerializedResult).isInstanceOf(PdxInstance.class);
+  }
+
+
+  static class TestDSFID implements DataSerializableFixedID {
+    Object mySet;
+    Object readUserObjectResult;
+    Object doWithPdxSerializedResult;
+
+    public TestDSFID() {}
+
+    @Override
+    public int getDSFID() {
+      // NO_FIXED_ID tells DataSerializer to write the class name on the output stream.
+      // It is only used for testing.
+      return NO_FIXED_ID;
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      DataSerializer.writeObject(mySet, out);
+      DataSerializer.writeObject(mySet, out);
+      DataSerializer.writeObject(mySet, out);
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+      // toData serialized a Set three times. TestSerializer has pdx-serialized
+      // these sets.
+
+      // readObject should return a Set
+      mySet = DataSerializer.readObject(in);
+
+      // readUserObject should return a PdxInstance
+      readUserObjectResult = InternalDataSerializer.readUserObject(in);
+
+      // readObject in the context of doWithPdxReadSerialized should return a PdxInstance
+      InternalDataSerializer.doWithPdxReadSerialized(() -> {
+        doWithPdxSerializedResult = DataSerializer.readObject(in);
+      });
+    }
+
+    @Override
+    public Version[] getSerializationVersions() {
+      return new Version[0];
+    }
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/pdx/PdxSerializerRegressionTest.java b/geode-core/src/test/java/org/apache/geode/pdx/PdxSerializerRegressionTest.java
new file mode 100644
index 0000000..5530b92
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/pdx/PdxSerializerRegressionTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.pdx;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.test.dunit.DistributedTestUtils;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.categories.SerializationTest;
+
+@Category({DistributedTest.class, SerializationTest.class})
+public class PdxSerializerRegressionTest {
+  @Rule
+  public transient ClusterStartupRule startupRule = new ClusterStartupRule();
+
+
+  /**
+   * A PdxSerializer may PDX-serialize any object it chooses, even JDK classes used in
+   * Geode messaging. This caused initial image transfer for the _PR partition region
+   * metadata to fail when pdx-read-serialized was set to true due to a class-cast
+   * exception when a PdxInstance was returned by DataSerializer.readObject() instead
+   * of the expected collection of PartitionConfig objects.
+   */
+  @Test
+  public void serializerHandlingSetDoesNotBreakInitialImageTransferForPRRegistry() {
+    MemberVM server1 = startupRule.startServerVM(1,
+        x -> x.withConnectionToLocator(DistributedTestUtils.getLocatorPort())
+            .withPDXReadSerialized().withPdxSerializer(new TestSerializer()));
+    createRegion(server1, RegionShortcut.PARTITION);
+    MemberVM server2 = startupRule.startServerVM(2,
+        x -> x.withConnectionToLocator(DistributedTestUtils.getLocatorPort())
+            .withPDXReadSerialized().withPdxSerializer(new TestSerializer()));
+    createRegion(server2, RegionShortcut.PARTITION);
+    assertThat((boolean) server1.invoke(() -> {
+      return TestSerializer.serializerInvoked;
+    })).isTrue();
+    assertThat((boolean) server2.invoke(() -> {
+      return TestSerializer.serializerInvoked;
+    })).isTrue();
+  }
+
+  private void createRegion(MemberVM vm, RegionShortcut regionShortcut) {
+    vm.invoke("create region", () -> {
+      ClusterStartupRule.getCache().createRegionFactory(regionShortcut)
+          .addAsyncEventQueueId("queue1").create("region");
+    });
+  }
+
+  static class TestSerializer implements PdxSerializer, Serializable {
+    static boolean serializerInvoked;
+
+    @Override
+    public boolean toData(Object o, PdxWriter out) {
+      if (o.getClass().getName().equals("java.util.Collections$UnmodifiableSet")) {
+        serializerInvoked = true;
+        System.out.println("TestSerializer is serializing a Set");
+        Set set = (Set<Object>) o;
+        out.writeInt("size", set.size());
+        int elementIndex = 0;
+        for (Object element : set) {
+          out.writeObject("element" + (elementIndex++), element);
+        }
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public Object fromData(Class<?> clazz, PdxReader in) {
+      if (clazz.getName().equals("java.util.Collections$UnmodifiableSet")) {
+        System.out.println("TestSerializer is deserializing a Set");
+        int size = in.readInt("size");
+        Set result = new HashSet(size);
+        for (int elementIndex = 0; elementIndex < size; elementIndex++) {
+          result.add(in.readObject("element" + elementIndex));
+        }
+        return Collections.unmodifiableSet(result);
+      }
+      return null;
+    }
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/test/junit/rules/VMProvider.java b/geode-core/src/test/java/org/apache/geode/test/junit/rules/VMProvider.java
index 9898305..27dc234 100644
--- a/geode-core/src/test/java/org/apache/geode/test/junit/rules/VMProvider.java
+++ b/geode-core/src/test/java/org/apache/geode/test/junit/rules/VMProvider.java
@@ -41,6 +41,10 @@ public abstract class VMProvider {
     getVM().invoke(runnable);
   }
 
+  public void invoke(final String runnableName, final SerializableRunnableIF runnable) {
+    getVM().invoke(runnableName, runnable);
+  }
+
   public <T> T invoke(final SerializableCallableIF<T> callable) {
     return getVM().invoke(callable);
   }
diff --git a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index c2e98ee..0547446 100644
--- a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -75,11 +75,11 @@ fromData,126
 toData,125
 
 org/apache/geode/cache/query/internal/LinkedResultSet,2
-fromData,40
+fromData,30
 toData,46
 
 org/apache/geode/cache/query/internal/LinkedStructSet,2
-fromData,68
+fromData,40
 toData,66
 
 org/apache/geode/cache/query/internal/NWayMergeResults,2
@@ -99,19 +99,19 @@ fromData,106
 toData,116
 
 org/apache/geode/cache/query/internal/ResultsCollectionWrapper,2
-fromData,55
+fromData,40
 toData,60
 
 org/apache/geode/cache/query/internal/ResultsSet,2
-fromData,49
+fromData,37
 toData,70
 
 org/apache/geode/cache/query/internal/SortedResultSet,2
-fromData,40
+fromData,30
 toData,46
 
 org/apache/geode/cache/query/internal/SortedStructSet,2
-fromData,57
+fromData,40
 toData,64
 
 org/apache/geode/cache/query/internal/StructBag,2
@@ -119,11 +119,11 @@ fromData,16
 toData,16
 
 org/apache/geode/cache/query/internal/StructImpl,2
-fromData,72
+fromData,74
 toData,17
 
 org/apache/geode/cache/query/internal/StructSet,2
-fromData,58
+fromData,48
 toData,46
 
 org/apache/geode/cache/query/internal/Undefined,2
@@ -1177,86 +1177,10 @@ org/apache/geode/internal/cache/ReleaseClearLockMessage,2
 fromData,24
 toData,24
 
-org/apache/geode/internal/cache/tx/RemoteContainsKeyValueMessage,2
-fromData,33
-toData,14
-
-org/apache/geode/internal/cache/tx/RemoteContainsKeyValueMessage$RemoteContainsKeyValueReplyMessage,2
+org/apache/geode/internal/cache/RemoveCacheServerProfileMessage,2
 fromData,16
 toData,16
 
-org/apache/geode/internal/cache/tx/RemoteDestroyMessage,2
-fromData,131
-toData,135
-
-org/apache/geode/internal/cache/tx/RemoteDestroyMessage$DestroyReplyMessage,2
-fromData,52
-toData,57
-
-org/apache/geode/internal/cache/tx/RemoteFetchEntryMessage,2
-fromData,14
-toData,14
-
-org/apache/geode/internal/cache/tx/RemoteFetchEntryMessage$FetchEntryReplyMessage,2
-fromData,30
-toData,38
-
-org/apache/geode/internal/cache/tx/RemoteFetchVersionMessage,2
-fromData,14
-toData,14
-
-org/apache/geode/internal/cache/tx/RemoteFetchVersionMessage$FetchVersionReplyMessage,2
-fromData,17
-toData,14
-
-org/apache/geode/internal/cache/tx/RemoteGetMessage,2
-fromData,33
-toData,30
-
-org/apache/geode/internal/cache/tx/RemoteGetMessage$GetReplyMessage,2
-fromData,48
-toData,32
-
-org/apache/geode/internal/cache/tx/RemoteInvalidateMessage$InvalidateReplyMessage,2
-fromData,52
-toData,57
-
-org/apache/geode/internal/cache/tx/RemoteOperationMessage,2
-fromData,43
-toData,103
-
-org/apache/geode/internal/cache/tx/RemotePutAllMessage,2
-fromData,223
-toData,173
-
-org/apache/geode/internal/cache/tx/RemotePutAllMessage$PutAllReplyMessage,2
-fromData,17
-toData,14
-
-org/apache/geode/internal/cache/tx/RemotePutMessage,2
-fromData,223
-toData,252
-
-org/apache/geode/internal/cache/tx/RemotePutMessage$PutReplyMessage,2
-fromData,81
-toData,94
-
-org/apache/geode/internal/cache/tx/RemoteClearMessage,2
-fromData,13
-toData,18
-
-org/apache/geode/internal/cache/tx/RemoteClearMessage$RemoteClearReplyMessage,2
-fromData,6
-toData,6
-
-org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage,2
-fromData,203
-toData,173
-
-org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage$RemoveAllReplyMessage,2
-fromData,17
-toData,14
-
 org/apache/geode/internal/cache/RoleEventImpl,2
 fromData,63
 toData,72
@@ -1338,7 +1262,7 @@ fromData,173
 toData,77
 
 org/apache/geode/internal/cache/TXCommitMessage$RegionCommit$FarSideEntryOp,2
-fromData,171
+fromData,167
 toData,187
 
 org/apache/geode/internal/cache/TXEntryState$DistTxThinEntryState,2
@@ -1580,7 +1504,7 @@ fromData,35
 toData,24
 
 org/apache/geode/internal/cache/partitioned/FetchBulkEntriesMessage,2
-fromData,67
+fromData,43
 toData,67
 
 org/apache/geode/internal/cache/partitioned/FetchBulkEntriesMessage$FetchBulkEntriesReplyMessage,2
@@ -1744,7 +1668,7 @@ fromData,48
 toData,54
 
 org/apache/geode/internal/cache/partitioned/QueryMessage,2
-fromData,63
+fromData,65
 toData,63
 
 org/apache/geode/internal/cache/partitioned/RegionAdvisor$BucketProfileAndId,2
@@ -1755,22 +1679,6 @@ org/apache/geode/internal/cache/partitioned/RegionAdvisor$PartitionProfile,2
 fromData,53
 toData,34
 
-org/apache/geode/internal/cache/tx/RemoteFetchKeysMessage,2
-fromData,6
-toData,6
-
-org/apache/geode/internal/cache/tx/RemoteFetchKeysMessage$RemoteFetchKeysReplyMessage,2
-fromData,54
-toData,54
-
-org/apache/geode/internal/cache/tx/RemoteSizeMessage,2
-fromData,18
-toData,18
-
-org/apache/geode/internal/cache/tx/RemoteSizeMessage$SizeReplyMessage,2
-fromData,16
-toData,16
-
 org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage,2
 fromData,190
 toData,194
@@ -1954,6 +1862,102 @@ org/apache/geode/internal/cache/tx/DistTxEntryEvent,2
 fromData,100
 toData,121
 
+org/apache/geode/internal/cache/tx/RemoteClearMessage,2
+fromData,13
+toData,18
+
+org/apache/geode/internal/cache/tx/RemoteClearMessage$RemoteClearReplyMessage,2
+fromData,6
+toData,6
+
+org/apache/geode/internal/cache/tx/RemoteContainsKeyValueMessage,2
+fromData,33
+toData,14
+
+org/apache/geode/internal/cache/tx/RemoteContainsKeyValueMessage$RemoteContainsKeyValueReplyMessage,2
+fromData,16
+toData,16
+
+org/apache/geode/internal/cache/tx/RemoteDestroyMessage,2
+fromData,131
+toData,135
+
+org/apache/geode/internal/cache/tx/RemoteDestroyMessage$DestroyReplyMessage,2
+fromData,52
+toData,57
+
+org/apache/geode/internal/cache/tx/RemoteFetchEntryMessage,2
+fromData,14
+toData,14
+
+org/apache/geode/internal/cache/tx/RemoteFetchEntryMessage$FetchEntryReplyMessage,2
+fromData,30
+toData,38
+
+org/apache/geode/internal/cache/tx/RemoteFetchKeysMessage,2
+fromData,6
+toData,6
+
+org/apache/geode/internal/cache/tx/RemoteFetchKeysMessage$RemoteFetchKeysReplyMessage,2
+fromData,54
+toData,54
+
+org/apache/geode/internal/cache/tx/RemoteFetchVersionMessage,2
+fromData,14
+toData,14
+
+org/apache/geode/internal/cache/tx/RemoteFetchVersionMessage$FetchVersionReplyMessage,2
+fromData,17
+toData,14
+
+org/apache/geode/internal/cache/tx/RemoteGetMessage,2
+fromData,33
+toData,30
+
+org/apache/geode/internal/cache/tx/RemoteGetMessage$GetReplyMessage,2
+fromData,48
+toData,32
+
+org/apache/geode/internal/cache/tx/RemoteInvalidateMessage$InvalidateReplyMessage,2
+fromData,52
+toData,57
+
+org/apache/geode/internal/cache/tx/RemoteOperationMessage,2
+fromData,43
+toData,103
+
+org/apache/geode/internal/cache/tx/RemotePutAllMessage,2
+fromData,223
+toData,173
+
+org/apache/geode/internal/cache/tx/RemotePutAllMessage$PutAllReplyMessage,2
+fromData,17
+toData,14
+
+org/apache/geode/internal/cache/tx/RemotePutMessage,2
+fromData,223
+toData,252
+
+org/apache/geode/internal/cache/tx/RemotePutMessage$PutReplyMessage,2
+fromData,81
+toData,94
+
+org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage,2
+fromData,203
+toData,173
+
+org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage$RemoveAllReplyMessage,2
+fromData,17
+toData,14
+
+org/apache/geode/internal/cache/tx/RemoteSizeMessage,2
+fromData,18
+toData,18
+
+org/apache/geode/internal/cache/tx/RemoteSizeMessage$SizeReplyMessage,2
+fromData,16
+toData,16
+
 org/apache/geode/internal/cache/versions/RVVException,2
 fromData,1
 toData,14
@@ -2000,14 +2004,6 @@ org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperat
 fromData,32
 toData,35
 
-org/apache/geode/internal/cache/wan/parallel/ParallelQueueBatchRemovalMessage,2
-fromData,17
-toData,14
-
-org/apache/geode/internal/cache/wan/parallel/ParallelQueueBatchRemovalMessage$BatchRemovalReplyMessage,2
-fromData,6
-toData,6
-
 org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage,2
 fromData,14
 toData,14
@@ -2028,10 +2024,6 @@ org/apache/geode/internal/modules/util/RegionConfiguration,2
 fromData,77
 toData,74
 
-org/apache/geode/internal/security/ObjectWithAuthz,2
-fromData,17
-toData,17
-
 org/apache/geode/internal/util/VersionedArrayList,2
 fromData,85
 toData,88
@@ -2069,8 +2061,8 @@ fromData,41
 toData,41
 
 org/apache/geode/management/internal/cli/functions/CliFunctionResult,4
-fromData,61
-fromDataPre_GFE_8_0_0_0,34
+fromData,57
+fromDataPre_GFE_8_0_0_0,30
 toData,49
 toDataPre_GFE_8_0_0_0,25
 
@@ -2136,8 +2128,3 @@ org/apache/geode/redis/internal/DoubleWrapper,2
 fromData,9
 toData,9
 
-org/apache/geode/internal/cache/RemoveCacheServerProfileMessage,2
-fromData,16
-toData,16
-
-
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneResultStructImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneResultStructImpl.java
index 25a6a78..3675df8 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneResultStructImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneResultStructImpl.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.cache.lucene.LuceneResultStruct;
 import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 
 public class LuceneResultStructImpl<K, V>
@@ -111,8 +112,8 @@ public class LuceneResultStructImpl<K, V>
 
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    key = DataSerializer.readObject(in);
-    value = DataSerializer.readObject(in);
+    key = InternalDataSerializer.readUserObject(in);
+    value = InternalDataSerializer.readUserObject(in);
     score = in.readFloat();
   }
 
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/EntryScore.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/EntryScore.java
index 788e101..0217f92 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/EntryScore.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/EntryScore.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 
 /**
@@ -73,7 +74,7 @@ public class EntryScore<K> implements DataSerializableFixedID {
 
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    key = DataSerializer.readObject(in);
+    key = InternalDataSerializer.readUserObject(in);
     score = in.readFloat();
   }
 }
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/results/PageEntry.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/results/PageEntry.java
index 294e7e7..abd247c 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/results/PageEntry.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/results/PageEntry.java
@@ -19,6 +19,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.geode.DataSerializer;
+import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.CachedDeserializable;
 import org.apache.geode.internal.offheap.StoredObject;
 
@@ -66,8 +67,8 @@ public class PageEntry {
   }
 
   public void fromData(final DataInput in) throws IOException, ClassNotFoundException {
-    key = DataSerializer.readObject(in);
-    value = DataSerializer.readObject(in);
+    key = InternalDataSerializer.readUserObject(in);
+    value = InternalDataSerializer.readUserObject(in);
   }
 
   @Override

-- 
To stop receiving notification emails like this one, please contact
bschuchardt@apache.org.