You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2018/09/07 23:52:34 UTC

[geode] 01/01: GEODE-5705: Improve basicReadObject to use switch statement rather than if statements.

This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch feature/GEODE-5705
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 4214a601bcc273f8950dfa5c5e9543be5cef8e4c
Author: Udo Kohlmeyer <uk...@pivotal.io>
AuthorDate: Thu Sep 6 17:14:03 2018 -0700

    GEODE-5705: Improve basicReadObject to use switch statement rather
    than if statements.
---
 .../geode/internal/InternalDataSerializer.java     | 1671 +++++++++-----------
 .../apache/geode/internal/util/DscodeHelper.java   |   19 +
 .../java/org/apache/geode/internal/DSCODETest.java |   10 +
 3 files changed, 799 insertions(+), 901 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
index 72a1bc0..fc3722a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
@@ -106,6 +106,7 @@ import org.apache.geode.internal.lang.ClassUtils;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.util.DscodeHelper;
 import org.apache.geode.internal.util.concurrent.CopyOnWriteHashMap;
 import org.apache.geode.pdx.NonPortableClassException;
 import org.apache.geode.pdx.PdxInstance;
@@ -127,19 +128,24 @@ import org.apache.geode.pdx.internal.TypeRegistry;
  * Contains static methods for data serializing instances of internal GemFire classes. It also
  * contains the implementation of the distribution messaging (and shared memory management) needed
  * to support data serialization.
- *
  * @since GemFire 3.5
  */
 public abstract class InternalDataSerializer extends DataSerializer {
+  // array is null
+  public static final byte NULL_ARRAY = -1;
+  /**
+   * array len encoded as int in next 4 bytes
+   * @since GemFire 5.7
+   */
+  public static final byte INT_ARRAY_LEN = -3;
+  public static final boolean LOAD_CLASS_EACH_TIME =
+      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "loadClassOnEveryDeserialization");
   private static final Logger logger = LogService.getLogger();
-
   /**
    * Maps Class names to their DataSerializer. This is used to find a DataSerializer during
    * serialization.
    */
   private static final Map<String, DataSerializer> classesToSerializers = new ConcurrentHashMap<>();
-
-
   /**
    * This list contains classes that Geode's classes subclass, such as antlr AST classes which are
    * used by our Object Query Language. It also contains certain classes that are DataSerializable
@@ -191,27 +197,84 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
           // geode-modules
           + ";org.apache.geode.modules.util.SessionCustomExpiry" + ";";
-
-
+  private static final String serializationVersionTxt =
+      System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "serializationVersion");
+  /**
+   * Change this constant to be the last one in SERIALIZATION_VERSION
+   */
+  private static final SERIALIZATION_VERSION latestVersion = SERIALIZATION_VERSION.v662;
+  private static final SERIALIZATION_VERSION serializationVersion = calculateSerializationVersion();
+  /**
+   * Maps the id of a serializer to its {@code DataSerializer}.
+   */
+  private static final ConcurrentMap/* <Integer, DataSerializer|Marker> */ idsToSerializers =
+      new ConcurrentHashMap();
+  /**
+   * Contains the classnames of the data serializers (and not the supported classes) not yet loaded
+   * into the vm as keys and their corresponding holder instances as values.
+   */
+  private static final ConcurrentHashMap<String, SerializerAttributesHolder> dsClassesToHolders =
+      new ConcurrentHashMap<>();
+  /**
+   * Contains the id of the data serializers not yet loaded into the vm as keys and their
+   * corresponding holder instances as values.
+   */
+  private static final ConcurrentHashMap<Integer, SerializerAttributesHolder> idsToHolders =
+      new ConcurrentHashMap<>();
+  /**
+   * Contains the classnames of supported classes as keys and their corresponding
+   * SerializerAttributesHolder instances as values. This applies only to the data serializers which
+   * have not been loaded into the vm.
+   */
+  private static final ConcurrentHashMap<String, SerializerAttributesHolder>
+      supportedClassesToHolders =
+      new ConcurrentHashMap<>();
+  private static final Object listenersSync = new Object();
+  private static final byte TIME_UNIT_NANOSECONDS = -1;
+  private static final byte TIME_UNIT_MICROSECONDS = -2;
+  private static final byte TIME_UNIT_MILLISECONDS = -3;
+  private static final byte TIME_UNIT_SECONDS = -4;
+  private static final ConcurrentMap dsfidToClassMap =
+      logger.isTraceEnabled(LogMarker.SERIALIZER_WRITE_DSFID_VERBOSE) ? new ConcurrentHashMap()
+          : null;
+  /**
+   * array len encoded as unsigned short in next 2 bytes
+   * @since GemFire 5.7
+   */
+  private static final byte SHORT_ARRAY_LEN = -2;
+  private static final int MAX_BYTE_ARRAY_LEN = (byte) -4 & 0xFF;
+  private static final ThreadLocal<Boolean> pdxSerializationInProgress = new ThreadLocal<>();
+  // Variable Length long encoded as int in next 4 bytes
+  private static final byte INT_VL = 126;
+  // Variable Length long encoded as long in next 8 bytes
+  private static final byte LONG_VL = 127;
+  private static final int MAX_BYTE_VL = 125;
+  private static final CopyOnWriteHashMap<String, WeakReference<Class<?>>> classCache =
+      LOAD_CLASS_EACH_TIME ? null : new CopyOnWriteHashMap<>();
+  private static final Object cacheAccessLock = new Object();
   private static InputStreamFilter defaultSerializationFilter = new EmptyInputStreamFilter();
-
   /**
    * A deserialization filter for ObjectInputStreams
    */
   private static InputStreamFilter serializationFilter = defaultSerializationFilter;
-
-  private static final String serializationVersionTxt =
-      System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "serializationVersion");
-
   /**
    * support for old GemFire clients and WAN sites - needed to enable moving from GemFire to Geode
    */
   private static OldClientSupportService oldClientSupportService;
+  /**
+   * {@code RegistrationListener}s that receive callbacks when {@code DataSerializer}s and {@code
+   * Instantiator}s are registered. Note: copy-on-write access used for this set
+   */
+  private static volatile Set listeners = new HashSet();
+  private static DataSerializer dvddeserializer;
+
+  static {
+    initializeWellKnownSerializers();
+  }
 
   /**
    * For backward compatibility we must swizzle the package of some classes that had to be moved
    * when GemFire was open- sourced. This preserves backward-compatibility.
-   *
    * @param name the fully qualified class name
    * @return the name of the class in this implementation
    */
@@ -233,7 +296,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   /**
    * For backward compatibility we must swizzle the package of some classes that had to be moved
    * when GemFire was open- sourced. This preserves backward-compatibility.
-   *
    * @param name the fully qualified class name
    * @param out the consumer of the serialized object
    * @return the name of the class in this implementation
@@ -256,12 +318,11 @@ public abstract class InternalDataSerializer extends DataSerializer {
   /**
    * Initializes the optional serialization "accept list" if the user has requested it in the
    * DistributionConfig
-   *
    * @param distributionConfig the DistributedSystem configuration
    * @param services DistributedSystem services that might have classes to acceptlist
    */
   public static void initialize(DistributionConfig distributionConfig,
-      Collection<DistributedSystemService> services) {
+                                Collection<DistributedSystemService> services) {
     logger.info("initializing InternalDataSerializer with {} services", services.size());
     if (distributionConfig.getValidateSerializableObjects()) {
       if (!ClassUtils.isClassAvailable("sun.misc.ObjectInputFilter")
@@ -281,11 +342,9 @@ public abstract class InternalDataSerializer extends DataSerializer {
     serializationFilter = defaultSerializationFilter;
   }
 
-
   /**
    * {@link DistributedSystemService}s that need to acceptlist Serializable objects can use this to
-   * read them from a file and then return them via
-   * {@link DistributedSystemService#getSerializationAcceptlist}
+   * read them from a file and then return them via {@link DistributedSystemService#getSerializationAcceptlist}
    */
   public static Collection<String> loadClassNames(URL sanctionedSerializables) throws IOException {
     Collection<String> result = new ArrayList(1000);
@@ -311,25 +370,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   }
 
-
-  /**
-   * Any time new serialization format is added then a new enum needs to be added here.
-   *
-   * @since GemFire 6.6.2
-   */
-  private enum SERIALIZATION_VERSION {
-    vINVALID,
-    // includes 6.6.0.x and 6.6.1.x. Note that no serialization changes were made in 6.6 until 6.6.2
-    v660,
-    // 6.6.2.x or later NOTE if you add a new constant make sure and update "latestVersion".
-    v662
-  }
-
-  /**
-   * Change this constant to be the last one in SERIALIZATION_VERSION
-   */
-  private static final SERIALIZATION_VERSION latestVersion = SERIALIZATION_VERSION.v662;
-
   private static SERIALIZATION_VERSION calculateSerializationVersion() {
     if (serializationVersionTxt == null || serializationVersionTxt.isEmpty()) {
       return latestVersion;
@@ -343,8 +383,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
     }
   }
 
-  private static final SERIALIZATION_VERSION serializationVersion = calculateSerializationVersion();
-
   public static boolean is662SerializationEnabled() {
     return serializationVersion.ordinal() >= SERIALIZATION_VERSION.v662.ordinal();
   }
@@ -358,10 +396,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
     }
   }
 
-  static {
-    initializeWellKnownSerializers();
-  }
-
   private static void initializeWellKnownSerializers() {
     // ArrayBlockingQueue does not have zero-arg constructor
     // LinkedBlockingQueue does have zero-arg constructor but no way to get capacity
@@ -769,42 +803,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   }
 
   /**
-   * Maps the id of a serializer to its {@code DataSerializer}.
-   */
-  private static final ConcurrentMap/* <Integer, DataSerializer|Marker> */ idsToSerializers =
-      new ConcurrentHashMap();
-
-  /**
-   * Contains the classnames of the data serializers (and not the supported classes) not yet loaded
-   * into the vm as keys and their corresponding holder instances as values.
-   */
-  private static final ConcurrentHashMap<String, SerializerAttributesHolder> dsClassesToHolders =
-      new ConcurrentHashMap<>();
-
-  /**
-   * Contains the id of the data serializers not yet loaded into the vm as keys and their
-   * corresponding holder instances as values.
-   */
-  private static final ConcurrentHashMap<Integer, SerializerAttributesHolder> idsToHolders =
-      new ConcurrentHashMap<>();
-
-  /**
-   * Contains the classnames of supported classes as keys and their corresponding
-   * SerializerAttributesHolder instances as values. This applies only to the data serializers which
-   * have not been loaded into the vm.
-   */
-  private static final ConcurrentHashMap<String, SerializerAttributesHolder> supportedClassesToHolders =
-      new ConcurrentHashMap<>();
-
-  /**
-   * {@code RegistrationListener}s that receive callbacks when {@code DataSerializer}s and {@code
-   * Instantiator}s are registered. Note: copy-on-write access used for this set
-   */
-  private static volatile Set listeners = new HashSet();
-
-  private static final Object listenersSync = new Object();
-
-  /**
    * Convert the given unsigned byte to an int. The returned value will be in the range [0..255]
    * inclusive
    */
@@ -812,17 +810,16 @@ public abstract class InternalDataSerializer extends DataSerializer {
     return ub & 0xFF;
   }
 
-  public static void setOldClientSupportService(final OldClientSupportService svc) {
-    oldClientSupportService = svc;
-  }
-
   public static OldClientSupportService getOldClientSupportService() {
     return oldClientSupportService;
   }
 
+  public static void setOldClientSupportService(final OldClientSupportService svc) {
+    oldClientSupportService = svc;
+  }
+
   /**
    * Instantiates an instance of {@code DataSerializer}
-   *
    * @throws IllegalArgumentException If the class can't be instantiated
    * @see DataSerializer#register(Class)
    */
@@ -839,11 +836,11 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
     } catch (NoSuchMethodException ignored) {
       StringId s = LocalizedStrings.DataSerializer_CLASS_0_DOES_NOT_HAVE_A_ZEROARGUMENT_CONSTRUCTOR;
-      Object[] args = new Object[] {c.getName()};
+      Object[] args = new Object[]{c.getName()};
       if (c.getDeclaringClass() != null) {
         s =
             LocalizedStrings.DataSerializer_CLASS_0_DOES_NOT_HAVE_A_ZEROARGUMENT_CONSTRUCTOR_IT_IS_AN_INNER_CLASS_OF_1_SHOULD_IT_BE_A_STATIC_INNER_CLASS;
-        args = new Object[] {c.getName(), c.getDeclaringClass()};
+        args = new Object[]{c.getName(), c.getDeclaringClass()};
       }
       throw new IllegalArgumentException(s.toLocalizedString(args));
     }
@@ -875,7 +872,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
   }
 
   public static DataSerializer register(Class c, boolean distribute, EventID eventId,
-      ClientProxyMembershipID context) {
+                                        ClientProxyMembershipID context) {
     DataSerializer s = newInstance(c);
     // This method is only called when server connection and CacheClientUpdaterThread
     s.setEventId(eventId);
@@ -885,9 +882,8 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * Registers a {@code DataSerializer} instance with the data serialization framework.
-   *
    * @param distribute Should the registered {@code DataSerializer} be distributed to other members
-   *        of the distributed system?
+   * of the distributed system?
    * @see DataSerializer#register(Class)
    */
   public static DataSerializer register(Class c, boolean distribute) {
@@ -945,7 +941,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
           DataSerializer other = (DataSerializer) oldSerializer;
           throw new IllegalStateException(
               LocalizedStrings.InternalDataSerializer_A_DATASERIALIZER_OF_CLASS_0_IS_ALREADY_REGISTERED_WITH_ID_1_SO_THE_DATASERIALIZER_OF_CLASS_2_COULD_NOT_BE_REGISTERED
-                  .toLocalizedString(new Object[] {other.getClass().getName(), other.getId()}));
+                  .toLocalizedString(new Object[]{other.getClass().getName(), other.getId()}));
         }
       }
     } while (retry);
@@ -1018,7 +1014,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   /**
    * Marks a {@code DataSerializer} className for registration with the data serialization framework
    * if and when it is needed. Does not necessarily load the classes into this VM.
-   *
    * @param className Name of the DataSerializer class.
    * @param distribute If true, distribute this data serializer.
    * @param eventId Event id
@@ -1026,7 +1021,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
    * @see DataSerializer#register(Class)
    */
   public static void register(String className, boolean distribute, EventID eventId,
-      ClientProxyMembershipID proxyId, int id) {
+                              ClientProxyMembershipID proxyId, int id) {
     register(className, distribute,
         new SerializerAttributesHolder(className, eventId, proxyId, id));
   }
@@ -1034,7 +1029,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   /**
    * Marks a {@code DataSerializer} className for registration with the data serialization
    * framework. Does not necessarily load the classes into this VM.
-   *
    * @param distribute If true, distribute this data serializer.
    * @see DataSerializer#register(Class)
    */
@@ -1043,7 +1037,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
   }
 
   private static void register(String className, boolean distribute,
-      SerializerAttributesHolder holder) {
+                               SerializerAttributesHolder holder) {
     if (StringUtils.isBlank(className)) {
       throw new IllegalArgumentException("Class name cannot be null or empty.");
     }
@@ -1053,7 +1047,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
       if (oldValue.getId() != 0 && holder.getId() != 0 && oldValue.getId() != holder.getId()) {
         throw new IllegalStateException(
             LocalizedStrings.InternalDataSerializer_A_DATASERIALIZER_OF_CLASS_0_IS_ALREADY_REGISTERED_WITH_ID_1_SO_THE_DATASERIALIZER_OF_CLASS_2_COULD_NOT_BE_REGISTERED
-                .toLocalizedString(new Object[] {oldValue.getClass().getName(), oldValue.getId()}));
+                .toLocalizedString(new Object[]{oldValue.getClass().getName(), oldValue.getId()}));
       }
     }
 
@@ -1076,7 +1070,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
    * classes they support. The DataSerializers are registered as "holders" to avoid loading the
    * actual classes until they're needed. This method registers the names of classes supported by
    * the DataSerializers
-   *
    * @param map The classes returned by DataSerializer.supportedClasses()
    */
   public static void updateSupportedClassesMap(Map<Integer, List<String>> map) {
@@ -1097,53 +1090,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
     }
   }
 
-  /**
-   * A SerializerAttributesHolder holds information required to load a DataSerializer and exists to
-   * allow client/server connections to be created more quickly than they would if the
-   * DataSerializer information downloaded from the server were used to immediately load the
-   * corresponding classes.
-   */
-  public static class SerializerAttributesHolder {
-    private String className = "";
-    private EventID eventId = null;
-    private ClientProxyMembershipID proxyId = null;
-    private int id = 0;
-
-    SerializerAttributesHolder() {}
-
-    SerializerAttributesHolder(String name, EventID event, ClientProxyMembershipID proxy, int id) {
-      this.className = name;
-      this.eventId = event;
-      this.proxyId = proxy;
-      this.id = id;
-    }
-
-    /**
-     * @return String the classname of the data serializer this instance represents.
-     */
-    public String getClassName() {
-      return this.className;
-    }
-
-    public EventID getEventId() {
-      return this.eventId;
-    }
-
-    public ClientProxyMembershipID getProxyId() {
-      return this.proxyId;
-    }
-
-    public int getId() {
-      return this.id;
-    }
-
-    @Override
-    public String toString() {
-      return "SerializerAttributesHolder[name=" + this.className + ",id=" + this.id + ",eventId="
-          + this.eventId + ']';
-    }
-  }
-
   private static void sendRegistrationMessageToServers(DataSerializer dataSerializer) {
     PoolManagerImpl.allPoolsRegisterDataSerializers(dataSerializer);
   }
@@ -1178,7 +1124,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
         new ClientDataSerializerMessage(EnumListenerEvent.AFTER_REGISTER_DATASERIALIZER,
             serializedDataSerializer, (ClientProxyMembershipID) dataSerializer.getContext(),
             (EventID) dataSerializer.getEventId(),
-            new Class[][] {dataSerializer.getSupportedClasses()});
+            new Class[][]{dataSerializer.getSupportedClasses()});
     // Deliver it to all the clients
     CacheClientNotifier.routeClientMessage(clientDataSerializerMessage);
   }
@@ -1353,7 +1299,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   /**
    * Returns all the data serializers in this vm. This method, unlike {@link #getSerializers()},
    * does not force loading of the data serializers which were not loaded in the vm earlier.
-   *
    * @return Array of {@link SerializerAttributesHolder}
    */
   public static SerializerAttributesHolder[] getSerializersForDistribution() {
@@ -1427,7 +1372,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * Read the data from in and register it with this class. TODO: loadRegistrations is unused
-   *
    * @throws IllegalArgumentException if a registration fails
    */
   public static void loadRegistrations(DataInput in) throws IOException {
@@ -1471,7 +1415,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * Alerts all {@code RegistrationListener}s that a new {@code DataSerializer} has been registered
-   *
    * @see InternalDataSerializer.RegistrationListener#newDataSerializer
    */
   private static void fireNewDataSerializer(DataSerializer ds) {
@@ -1483,7 +1426,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * Alerts all {@code RegistrationListener}s that a new {@code Instantiator} has been registered
-   *
    * @see InternalDataSerializer.RegistrationListener#newInstantiator
    */
   static void fireNewInstantiator(Instantiator instantiator) {
@@ -1575,18 +1517,16 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * Data serializes an instance of a well-known class to the given {@code DataOutput}.
-   *
    * @return {@code true} if {@code o} was actually written to {@code out}
    */
   private static boolean writeWellKnownObject(Object o, DataOutput out,
-      boolean ensurePdxCompatibility) throws IOException {
+                                              boolean ensurePdxCompatibility) throws IOException {
     return writeUserObject(o, out, ensurePdxCompatibility);
   }
 
   /**
    * Data serializes an instance of a "user class" (that is, a class that can be handled by a
    * registered {@code DataSerializer}) to the given {@code DataOutput}.
-   *
    * @return {@code true} if {@code o} was written to {@code out}.
    */
   private static boolean writeUserObject(Object o, DataOutput out, boolean ensurePdxCompatibility)
@@ -1670,7 +1610,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
       return true;
     } else if (is662SerializationEnabled()
         && (o.getClass().isEnum()/* for bug 52271 */ || (o.getClass().getSuperclass() != null
-            && o.getClass().getSuperclass().isEnum()))) {
+        && o.getClass().getSuperclass().isEnum()))) {
       if (isPdxSerializationInProgress()) {
         writePdxEnum((Enum<?>) o, out);
       } else {
@@ -1684,7 +1624,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
     }
   }
 
-
   public static boolean autoSerialized(Object o, DataOutput out) throws IOException {
     AutoSerializableManager asm = TypeRegistry.getAutoSerializableManager();
     if (asm != null) {
@@ -1767,7 +1706,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   /**
    * Reads an object that was serialized by a customer ("user") {@code DataSerializer} from the
    * given {@code DataInput}.
-   *
    * @throws IOException If the serializer that can deserialize the object is not registered.
    */
   private static Object readUserObject(DataInput in, int serializerId)
@@ -1776,7 +1714,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
     if (serializer == null) {
       throw new IOException(LocalizedStrings.DataSerializer_SERIALIZER_0_IS_NOT_REGISTERED
-          .toLocalizedString(new Object[] {serializerId}));
+          .toLocalizedString(new Object[]{serializerId}));
     }
 
     return serializer.fromData(in);
@@ -1784,25 +1722,21 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * Checks to make sure a {@code DataOutput} is not {@code null}.
-   *
    * @throws NullPointerException If {@code out} is {@code null}
    */
   public static void checkOut(DataOutput out) {
     if (out == null) {
-      String s = "Null DataOutput";
-      throw new NullPointerException(s);
+      throw new NullPointerException("Null DataOutput");
     }
   }
 
   /**
    * Checks to make sure a {@code DataInput} is not {@code null}.
-   *
    * @throws NullPointerException If {@code in} is {@code null}
    */
   public static void checkIn(DataInput in) {
     if (in == null) {
-      String s = "Null DataInput";
-      throw new NullPointerException(s);
+      throw new NullPointerException("Null DataInput");
     }
   }
 
@@ -1811,7 +1745,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
    * <P>
    * This method is internal because its semantics (that is, its ability to write any kind of {@code
    * Set}) are different from the {@code write}XXX methods of the external {@code DataSerializer}.
-   *
    * @throws IOException A problem occurs while writing to {@code out}
    * @see #readSet
    * @since GemFire 4.0
@@ -1819,29 +1752,24 @@ public abstract class InternalDataSerializer extends DataSerializer {
   public static void writeSet(Collection<?> set, DataOutput out) throws IOException {
     checkOut(out);
 
-    int size;
-    if (set == null) {
-      size = -1;
-    } else {
+    int size = -1;
+    if (set != null) {
       size = set.size();
     }
     writeArrayLength(size, out);
     if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
       logger.trace(LogMarker.SERIALIZER_VERBOSE, "Writing HashSet with {} elements: {}", size, set);
     }
-    if (size > 0) {
-      for (Object element : set) {
-        writeObject(element, out);
-      }
+    for (Object element : set) {
+      writeObject(element, out);
     }
   }
 
   /**
    * Reads a {@code Set} from a {@code DataInput}.
-   *
    * @throws IOException A problem occurs while writing to {@code out}
    * @throws ClassNotFoundException The class of one of the {@code HashSet}'s elements cannot be
-   *         found.
+   * found.
    * @see #writeSet
    * @since GemFire 4.0
    */
@@ -1852,7 +1780,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   /**
    * Reads a {@code Set} from a {@code DataInput} into the given non-null collection. Returns true
    * if collection read is non-null else returns false. TODO: readCollection is unused
-   *
    * @throws IOException A problem occurs while reading from {@code in}
    * @throws ClassNotFoundException The class of one of the {@code Set}'s elements cannot be found.
    * @see #writeSet
@@ -1879,7 +1806,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * write a set of Long objects
-   *
    * @param set the set of Long objects
    * @param hasLongIDs if false, write only ints, not longs
    * @param out the output stream
@@ -1922,7 +1848,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * write a set of Long objects TODO: writeListOfLongs is unused
-   *
    * @param list the set of Long objects
    * @param hasLongIDs if false, write only ints, not longs
    * @param out the output stream
@@ -1963,7 +1888,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
     }
   }
 
-
   /**
    * Writes the type code for a primitive type Class to {@code DataOutput}.
    */
@@ -1996,48 +1920,38 @@ public abstract class InternalDataSerializer extends DataSerializer {
   }
 
   public static Class decodePrimitiveClass(byte typeCode) {
-    if (typeCode == DSCODE.BOOLEAN_TYPE.toByte()) {
-      return Boolean.TYPE;
-    }
-    if (typeCode == DSCODE.CHARACTER_TYPE.toByte()) {
-      return Character.TYPE;
-    }
-    if (typeCode == DSCODE.BYTE_TYPE.toByte()) {
-      return Byte.TYPE;
-    }
-    if (typeCode == DSCODE.SHORT_TYPE.toByte()) {
-      return Short.TYPE;
-    }
-    if (typeCode == DSCODE.INTEGER_TYPE.toByte()) {
-      return Integer.TYPE;
-    }
-    if (typeCode == DSCODE.LONG_TYPE.toByte()) {
-      return Long.TYPE;
-    }
-    if (typeCode == DSCODE.FLOAT_TYPE.toByte()) {
-      return Float.TYPE;
-    }
-    if (typeCode == DSCODE.DOUBLE_TYPE.toByte()) {
-      return Double.TYPE;
-    }
-    if (typeCode == DSCODE.VOID_TYPE.toByte()) {
-      return Void.TYPE;
-    }
-    if (typeCode == DSCODE.NULL.toByte()) {
-      return null;
+    DSCODE dscode = DscodeHelper.toDSCODE(typeCode);
+    switch (dscode) {
+      case BOOLEAN_TYPE:
+        return Boolean.TYPE;
+      case CHARACTER_TYPE:
+        return Character.TYPE;
+      case BYTE_TYPE:
+        return Byte.TYPE;
+      case SHORT_TYPE:
+        return Short.TYPE;
+      case INTEGER_TYPE:
+        return Integer.TYPE;
+      case LONG_TYPE:
+        return Long.TYPE;
+      case FLOAT_TYPE:
+        return Float.TYPE;
+      case DOUBLE_TYPE:
+        return Double.TYPE;
+      case VOID_TYPE:
+        return Void.TYPE;
+      case NULL:
+        return null;
+      default:
+        throw new InternalGemFireError(
+            LocalizedStrings.InternalDataSerializer_UNEXPECTED_TYPECODE_0
+                .toLocalizedString(typeCode));
     }
-    throw new InternalGemFireError(
-        LocalizedStrings.InternalDataSerializer_UNEXPECTED_TYPECODE_0.toLocalizedString(typeCode));
-  }
 
-  private static final byte TIME_UNIT_NANOSECONDS = -1;
-  private static final byte TIME_UNIT_MICROSECONDS = -2;
-  private static final byte TIME_UNIT_MILLISECONDS = -3;
-  private static final byte TIME_UNIT_SECONDS = -4;
+  }
 
   /**
    * Reads a {@code TimeUnit} from a {@code DataInput}.
-   *
    * @throws IOException A problem occurs while writing to {@code out}
    */
   private static TimeUnit readTimeUnit(DataInput in) throws IOException {
@@ -2146,10 +2060,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
     return result;
   }
 
-  private static final ConcurrentMap dsfidToClassMap =
-      logger.isTraceEnabled(LogMarker.SERIALIZER_WRITE_DSFID_VERBOSE) ? new ConcurrentHashMap()
-          : null;
-
   public static void writeUserDataSerializableHeader(int classId, DataOutput out)
       throws IOException {
     if (classId <= Byte.MAX_VALUE && classId >= Byte.MIN_VALUE) {
@@ -2166,7 +2076,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * Writes given number of characters from array of {@code char}s to a {@code DataOutput}.
-   *
    * @throws IOException A problem occurs while writing to {@code out}
    * @see DataSerializer#readCharArray
    * @since GemFire 6.6
@@ -2190,7 +2099,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * returns true if the byte array is the serialized form of a null reference
-   *
    * @param serializedForm the serialized byte array
    */
   public static boolean isSerializedNull(byte[] serializedForm) {
@@ -2357,7 +2265,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   /**
    * write an object in java Serializable form with a SERIALIZABLE DSCODE so that it can be
    * deserialized with DataSerializer.readObject()
-   *
    * @param o the object to serialize
    * @param out the data output to serialize to
    */
@@ -2412,7 +2319,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
    * DataSerializable. It will invoke the correct toData method based on the class's version
    * information. This method does not write information about the class of the object. When
    * deserializing use the method invokeFromData to read the contents of the object.
-   *
    * @param ds the object to write
    * @param out the output stream.
    */
@@ -2422,7 +2328,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
       boolean invoked = false;
       Version v = InternalDataSerializer.getVersionForDataStreamOrNull(out);
 
-      if (v != null && v != Version.CURRENT) {
+      if (Version.CURRENT != v && v != null) {
         // get versions where DataOutput was upgraded
         Version[] versions = null;
         if (ds instanceof SerializationVersions) {
@@ -2431,12 +2337,12 @@ public abstract class InternalDataSerializer extends DataSerializer {
         }
         // check if the version of the peer or diskstore is different and
         // there has been a change in the message
-        if (versions != null && versions.length > 0) {
+        if (versions != null) {
           for (Version version : versions) {
             // if peer version is less than the greatest upgraded version
             if (v.compareTo(version) < 0) {
               ds.getClass().getMethod("toDataPre_" + version.getMethodSuffix(),
-                  new Class[] {DataOutput.class}).invoke(ds, out);
+                  new Class[]{DataOutput.class}).invoke(ds, out);
               invoked = true;
               break;
             }
@@ -2486,7 +2392,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
    * DataSerializable. It will invoke the correct fromData method based on the class's version
    * information. This method does not read information about the class of the object. When
    * serializing use the method invokeToData to write the contents of the object.
-   *
    * @param ds the object to write
    * @param in the input stream.
    */
@@ -2495,7 +2400,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
     try {
       boolean invoked = false;
       Version v = InternalDataSerializer.getVersionForDataStreamOrNull(in);
-      if (v != null && v != Version.CURRENT) {
+      if (Version.CURRENT != v && v != null) {
         // get versions where DataOutput was upgraded
         Version[] versions = null;
         if (ds instanceof SerializationVersions) {
@@ -2504,12 +2409,12 @@ public abstract class InternalDataSerializer extends DataSerializer {
         }
         // check if the version of the peer or diskstore is different and
         // there has been a change in the message
-        if (versions != null && versions.length > 0) {
+        if (versions != null) {
           for (Version version : versions) {
             // if peer version is less than the greatest upgraded version
             if (v.compareTo(version) < 0) {
               ds.getClass().getMethod("fromDataPre" + '_' + version.getMethodSuffix(),
-                  new Class[] {DataInput.class}).invoke(ds, in);
+                  new Class[]{DataInput.class}).invoke(ds, in);
               invoked = true;
               break;
             }
@@ -2534,7 +2439,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
     }
   }
 
-
   private static Object readDataSerializable(final DataInput in)
       throws IOException, ClassNotFoundException {
     Class c = readClass(in);
@@ -2642,25 +2546,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
     }
   }
 
-  // array is null
-  public static final byte NULL_ARRAY = -1;
-
-  /**
-   * array len encoded as unsigned short in next 2 bytes
-   *
-   * @since GemFire 5.7
-   */
-  private static final byte SHORT_ARRAY_LEN = -2;
-
-  /**
-   * array len encoded as int in next 4 bytes
-   *
-   * @since GemFire 5.7
-   */
-  public static final byte INT_ARRAY_LEN = -3;
-
-  private static final int MAX_BYTE_ARRAY_LEN = (byte) -4 & 0xFF;
-
   public static void writeArrayLength(int len, DataOutput out) throws IOException {
     if (len == -1) {
       out.writeByte(NULL_ARRAY);
@@ -2683,9 +2568,9 @@ public abstract class InternalDataSerializer extends DataSerializer {
       int result = ubyteToInt(code);
       if (result > MAX_BYTE_ARRAY_LEN) {
         if (code == SHORT_ARRAY_LEN) {
-          result = in.readUnsignedShort();
+          return in.readUnsignedShort();
         } else if (code == INT_ARRAY_LEN) {
-          result = in.readInt();
+          return in.readInt();
         } else {
           throw new IllegalStateException("unexpected array length code=" + code);
         }
@@ -2694,114 +2579,107 @@ public abstract class InternalDataSerializer extends DataSerializer {
     }
   }
 
-  /**
-   * Serializes a list of Integers. The argument may be null. Deserialize with
-   * readListOfIntegers().
-   *
-   * TODO: writeListOfIntegers is unused
-   */
-  public void writeListOfIntegers(List<Integer> list, DataOutput out) throws IOException {
-    int size;
-    if (list == null) {
-      size = -1;
-    } else {
-      size = list.size();
-    }
-    InternalDataSerializer.writeArrayLength(size, out);
-    if (size > 0) {
-      for (int i = 0; i < size; i++) {
-        out.writeInt(list.get(i));
-      }
+  private static Object readDSFID(final DataInput in, DSCODE dscode)
+      throws IOException, ClassNotFoundException {
+    if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
+      logger.trace(LogMarker.SERIALIZER_VERBOSE, "readDSFID: header={}", dscode);
+    }
+    switch (dscode) {
+      case DS_FIXED_ID_BYTE:
+        return DSFIDFactory.create(in.readByte(), in);
+      case DS_FIXED_ID_SHORT:
+        return DSFIDFactory.create(in.readShort(), in);
+      case DS_NO_FIXED_ID:
+        return readDataSerializableFixedID(in);
+      case DS_FIXED_ID_INT:
+        return DSFIDFactory.create(in.readInt(), in);
+      default:
+        throw new IllegalStateException("unexpected byte: " + dscode + " while reading dsfid");
     }
   }
 
   public static Object readDSFID(final DataInput in) throws IOException, ClassNotFoundException {
     checkIn(in);
-    byte header = in.readByte();
-    if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
-      logger.trace(LogMarker.SERIALIZER_VERBOSE, "readDSFID: header={}", header);
-    }
-    if (header == DSCODE.DS_FIXED_ID_BYTE.toByte()) {
-      return DSFIDFactory.create(in.readByte(), in);
-    } else if (header == DSCODE.DS_FIXED_ID_SHORT.toByte()) {
-      return DSFIDFactory.create(in.readShort(), in);
-    } else if (header == DSCODE.DS_NO_FIXED_ID.toByte()) {
-      return readDataSerializableFixedID(in);
-    } else if (header == DSCODE.DS_FIXED_ID_INT.toByte()) {
-      return DSFIDFactory.create(in.readInt(), in);
-    } else {
-      throw new IllegalStateException("unexpected byte: " + header + " while reading dsfid");
+    return readDSFID(in, DscodeHelper.toDSCODE(in.readByte()));
+  }
+
+  private static int readDSFIDHeader(final DataInput in, DSCODE dscode) throws IOException {
+    switch (dscode) {
+      case DS_FIXED_ID_BYTE:
+        return in.readByte();
+      case DS_FIXED_ID_SHORT:
+        return in.readShort();
+      case DS_FIXED_ID_INT:
+        return in.readInt();
+      default:
+        throw new IllegalStateException("unexpected byte: " + dscode + " while reading dsfid");
     }
   }
 
   public static int readDSFIDHeader(final DataInput in) throws IOException {
     checkIn(in);
-    byte header = in.readByte();
-    if (header == DSCODE.DS_FIXED_ID_BYTE.toByte()) {
-      return in.readByte();
-    } else if (header == DSCODE.DS_FIXED_ID_SHORT.toByte()) {
-      return in.readShort();
-    } else if (header == DSCODE.DS_NO_FIXED_ID.toByte()) {
-      // is that correct??
-      return Integer.MAX_VALUE;
-    } else if (header == DSCODE.DS_FIXED_ID_INT.toByte()) {
-      return in.readInt();
-    } else {
-      throw new IllegalStateException("unexpected byte: " + header + " while reading dsfid");
-    }
+    return readDSFIDHeader(in, DscodeHelper.toDSCODE(in.readByte()));
   }
 
   /**
    * Reads an instance of {@code String} from a {@code DataInput} given the header byte already
    * being read. The return value may be {@code null}.
-   *
    * @throws IOException A problem occurs while reading from {@code in}
    * @since GemFire 5.7
    */
-  public static String readString(DataInput in, byte header) throws IOException {
-    if (header == DSCODE.STRING_BYTES.toByte()) {
-      int len = in.readUnsignedShort();
-      if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
-        logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading STRING_BYTES of len={}", len);
-      }
-      byte[] buf = new byte[len];
-      in.readFully(buf, 0, len);
-      return new String(buf, 0); // intentionally using deprecated constructor
-    } else if (header == DSCODE.STRING.toByte()) {
-      if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
-        logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading utf STRING");
-      }
-      return in.readUTF();
-    } else if (header == DSCODE.NULL_STRING.toByte()) {
-      if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
-        logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading NULL_STRING");
-      }
-      return null;
-    } else if (header == DSCODE.HUGE_STRING_BYTES.toByte()) {
-      int len = in.readInt();
-      if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
-        logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading HUGE_STRING_BYTES of len={}", len);
-      }
-      byte[] buf = new byte[len];
-      in.readFully(buf, 0, len);
-      return new String(buf, 0); // intentionally using deprecated constructor
-    } else if (header == DSCODE.HUGE_STRING.toByte()) {
-      int len = in.readInt();
-      if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
-        logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading HUGE_STRING of len={}", len);
-      }
-      char[] buf = new char[len];
-      for (int i = 0; i < len; i++) {
-        buf[i] = in.readChar();
+  private static String readString(DataInput in, DSCODE dscode) throws IOException {
+    switch (dscode) {
+      case STRING_BYTES:
+        return readStringBytesFromDataInput(in, in.readUnsignedShort());
+      case STRING:
+        return readStringUTFFromDataInput(in);
+      case NULL_STRING: {
+        if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
+          logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading NULL_STRING");
+        }
+        return null;
       }
-      return new String(buf);
-    } else {
-      String s = "Unknown String header " + header;
-      throw new IOException(s);
+      case HUGE_STRING_BYTES:
+        return readStringBytesFromDataInput(in, in.readInt());
+      case HUGE_STRING:
+        return readHugeStringFromDataInput(in);
+      default:
+        throw new IOException("Unknown String header " + dscode);
     }
   }
 
-  private static DataSerializer dvddeserializer;
+  private static String readHugeStringFromDataInput(DataInput in) throws IOException {
+    int len = in.readInt();
+    if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
+      logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading HUGE_STRING of len={}", len);
+    }
+    char[] buf = new char[len];
+    for (int i = 0; i < len; i++) {
+      buf[i] = in.readChar();
+    }
+    return new String(buf);
+  }
+
+  private static String readStringUTFFromDataInput(DataInput in) throws IOException {
+    if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
+      logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading utf STRING");
+    }
+    return in.readUTF();
+  }
+
+  private static String readStringBytesFromDataInput(DataInput dataInput, int len)
+      throws IOException {
+    if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
+      logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading STRING_BYTES of len={}", len);
+    }
+    byte[] buf = new byte[len];
+    dataInput.readFully(buf, 0, len);
+    return new String(buf, 0); // intentionally using deprecated constructor
+  }
+
+  public static String readString(DataInput in, byte header) throws IOException {
+    return readString(in, DscodeHelper.toDSCODE(header));
+  }
 
   // TODO: registerDVDDeserializer is unused
   public static void registerDVDDeserializer(DataSerializer dvddeslzr) {
@@ -2810,7 +2688,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * Just like readObject but make sure and pdx deserialized is not a PdxInstance.
-   *
    * @since GemFire 6.6.2
    */
   public static <T> T readNonPdxInstanceObject(final DataInput in)
@@ -2834,260 +2711,207 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
     // Read the header byte
     byte header = in.readByte();
+    DSCODE headerDSCode = DscodeHelper.toDSCODE(header);
+
     if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
       logger.trace(LogMarker.SERIALIZER_VERBOSE, "basicReadObject: header={}", header);
     }
-    if (header == DSCODE.DS_FIXED_ID_BYTE.toByte()) {
-      return DSFIDFactory.create(in.readByte(), in);
-    }
-    if (header == DSCODE.DS_FIXED_ID_SHORT.toByte()) {
-      return DSFIDFactory.create(in.readShort(), in);
-    }
-    if (header == DSCODE.DS_FIXED_ID_INT.toByte()) {
-      return DSFIDFactory.create(in.readInt(), in);
-    }
-    if (header == DSCODE.DS_NO_FIXED_ID.toByte()) {
-      return readDataSerializableFixedID(in);
-    }
-    if (header == DSCODE.NULL.toByte()) {
-      return null;
-    }
-    if (header == DSCODE.NULL_STRING.toByte() || header == DSCODE.STRING.toByte()
-        || header == DSCODE.HUGE_STRING.toByte() || header == DSCODE.STRING_BYTES.toByte()
-        || header == DSCODE.HUGE_STRING_BYTES.toByte()) {
-      return readString(in, header);
-    }
-    if (header == DSCODE.CLASS.toByte()) {
-      return readClass(in);
-    }
-    if (header == DSCODE.DATE.toByte()) {
-      return readDate(in);
-    }
-    if (header == DSCODE.FILE.toByte()) {
-      return readFile(in);
-    }
-    if (header == DSCODE.INET_ADDRESS.toByte()) {
-      return readInetAddress(in);
-    }
-    if (header == DSCODE.BOOLEAN.toByte()) {
-      return readBoolean(in);
-    }
-    if (header == DSCODE.CHARACTER.toByte()) {
-      return readCharacter(in);
-    }
-    if (header == DSCODE.BYTE.toByte()) {
-      return readByte(in);
-    }
-    if (header == DSCODE.SHORT.toByte()) {
-      return readShort(in);
-    }
-    if (header == DSCODE.INTEGER.toByte()) {
-      return readInteger(in);
-    }
-    if (header == DSCODE.LONG.toByte()) {
-      return readLong(in);
-    }
-    if (header == DSCODE.FLOAT.toByte()) {
-      return readFloat(in);
-    }
-    if (header == DSCODE.DOUBLE.toByte()) {
-      return readDouble(in);
-    }
-    if (header == DSCODE.BYTE_ARRAY.toByte()) {
-      return readByteArray(in);
-    }
-    if (header == DSCODE.ARRAY_OF_BYTE_ARRAYS.toByte()) {
-      return readArrayOfByteArrays(in);
-    }
-    if (header == DSCODE.SHORT_ARRAY.toByte()) {
-      return readShortArray(in);
-    }
-    if (header == DSCODE.STRING_ARRAY.toByte()) {
-      return readStringArray(in);
-    }
-    if (header == DSCODE.INT_ARRAY.toByte()) {
-      return readIntArray(in);
-    }
-    if (header == DSCODE.LONG_ARRAY.toByte()) {
-      return readLongArray(in);
-    }
-    if (header == DSCODE.FLOAT_ARRAY.toByte()) {
-      return readFloatArray(in);
-    }
-    if (header == DSCODE.DOUBLE_ARRAY.toByte()) {
-      return readDoubleArray(in);
-    }
-    if (header == DSCODE.BOOLEAN_ARRAY.toByte()) {
-      return readBooleanArray(in);
-    }
-    if (header == DSCODE.CHAR_ARRAY.toByte()) {
-      return readCharArray(in);
-    }
-    if (header == DSCODE.OBJECT_ARRAY.toByte()) {
-      return readObjectArray(in);
-    }
-    if (header == DSCODE.ARRAY_LIST.toByte()) {
-      return readArrayList(in);
-    }
-    if (header == DSCODE.LINKED_LIST.toByte()) {
-      return readLinkedList(in);
-    }
-    if (header == DSCODE.HASH_SET.toByte()) {
-      return readHashSet(in);
-    }
-    if (header == DSCODE.LINKED_HASH_SET.toByte()) {
-      return readLinkedHashSet(in);
-    }
-    if (header == DSCODE.HASH_MAP.toByte()) {
-      return readHashMap(in);
-    }
-    if (header == DSCODE.IDENTITY_HASH_MAP.toByte()) {
-      return readIdentityHashMap(in);
-    }
-    if (header == DSCODE.HASH_TABLE.toByte()) {
-      return readHashtable(in);
-    }
-    if (header == DSCODE.CONCURRENT_HASH_MAP.toByte()) {
-      return readConcurrentHashMap(in);
-    }
-    if (header == DSCODE.PROPERTIES.toByte()) {
-      return readProperties(in);
-    }
-    if (header == DSCODE.TIME_UNIT.toByte()) {
-      return readTimeUnit(in);
-    }
-    if (header == DSCODE.USER_CLASS.toByte()) {
-      return readUserObject(in, in.readByte());
-    }
-    if (header == DSCODE.USER_CLASS_2.toByte()) {
-      return readUserObject(in, in.readShort());
-    }
-    if (header == DSCODE.USER_CLASS_4.toByte()) {
-      return readUserObject(in, in.readInt());
-    }
-    if (header == DSCODE.VECTOR.toByte()) {
-      return readVector(in);
-    }
-    if (header == DSCODE.STACK.toByte()) {
-      return readStack(in);
-    }
-    if (header == DSCODE.TREE_MAP.toByte()) {
-      return readTreeMap(in);
-    }
-    if (header == DSCODE.TREE_SET.toByte()) {
-      return readTreeSet(in);
-    }
-    if (header == DSCODE.BOOLEAN_TYPE.toByte()) {
-      return Boolean.TYPE;
-    }
-    if (header == DSCODE.CHARACTER_TYPE.toByte()) {
-      return Character.TYPE;
-    }
-    if (header == DSCODE.BYTE_TYPE.toByte()) {
-      return Byte.TYPE;
-    }
-    if (header == DSCODE.SHORT_TYPE.toByte()) {
-      return Short.TYPE;
-    }
-    if (header == DSCODE.INTEGER_TYPE.toByte()) {
-      return Integer.TYPE;
-    }
-    if (header == DSCODE.LONG_TYPE.toByte()) {
-      return Long.TYPE;
-    }
-    if (header == DSCODE.FLOAT_TYPE.toByte()) {
-      return Float.TYPE;
-    }
-    if (header == DSCODE.DOUBLE_TYPE.toByte()) {
-      return Double.TYPE;
-    }
-    if (header == DSCODE.VOID_TYPE.toByte()) {
-      return Void.TYPE;
-    }
-    if (header == DSCODE.USER_DATA_SERIALIZABLE.toByte()) {
-      return readUserDataSerializable(in, in.readByte());
-    }
-    if (header == DSCODE.USER_DATA_SERIALIZABLE_2.toByte()) {
-      return readUserDataSerializable(in, in.readShort());
-    }
-    if (header == DSCODE.USER_DATA_SERIALIZABLE_4.toByte()) {
-      return readUserDataSerializable(in, in.readInt());
-    }
-    if (header == DSCODE.DATA_SERIALIZABLE.toByte()) {
-      return readDataSerializable(in);
+
+    switch (headerDSCode) {
+      case DS_FIXED_ID_BYTE:
+        return DSFIDFactory.create(in.readByte(), in);
+      case DS_FIXED_ID_SHORT:
+        return DSFIDFactory.create(in.readShort(), in);
+      case DS_FIXED_ID_INT:
+        return DSFIDFactory.create(in.readInt(), in);
+      case DS_NO_FIXED_ID:
+        return readDataSerializableFixedID(in);
+      case NULL:
+        return null;
+      case NULL_STRING:
+        return null;
+      case STRING:
+        return readStringUTFFromDataInput(in);
+      case HUGE_STRING:
+        return readHugeStringFromDataInput(in);
+      case STRING_BYTES:
+        return readStringBytesFromDataInput(in, in.readUnsignedShort());
+      case HUGE_STRING_BYTES:
+        return readStringBytesFromDataInput(in, in.readInt());
+      case CLASS:
+        return readClass(in);
+      case DATE:
+        return readDate(in);
+      case FILE:
+        return readFile(in);
+      case INET_ADDRESS:
+        return readInetAddress(in);
+      case BOOLEAN:
+        return readBoolean(in);
+      case CHARACTER:
+        return readCharacter(in);
+      case BYTE:
+        return readByte(in);
+      case SHORT:
+        return readShort(in);
+      case INTEGER:
+        return readInteger(in);
+      case LONG:
+        return readLong(in);
+      case FLOAT:
+        return readFloat(in);
+      case DOUBLE:
+        return readDouble(in);
+      case BYTE_ARRAY:
+        return readByteArray(in);
+      case ARRAY_OF_BYTE_ARRAYS:
+        return readArrayOfByteArrays(in);
+      case SHORT_ARRAY:
+        return readShortArray(in);
+      case STRING_ARRAY:
+        return readStringArray(in);
+      case INT_ARRAY:
+        return readIntArray(in);
+      case LONG_ARRAY:
+        return readLongArray(in);
+      case FLOAT_ARRAY:
+        return readFloatArray(in);
+      case DOUBLE_ARRAY:
+        return readDoubleArray(in);
+      case BOOLEAN_ARRAY:
+        return readBooleanArray(in);
+      case CHAR_ARRAY:
+        return readCharArray(in);
+      case OBJECT_ARRAY:
+        return readObjectArray(in);
+      case ARRAY_LIST:
+        return readArrayList(in);
+      case LINKED_LIST:
+        return readLinkedList(in);
+      case HASH_SET:
+        return readHashSet(in);
+      case LINKED_HASH_SET:
+        return readLinkedHashSet(in);
+      case HASH_MAP:
+        return readHashMap(in);
+      case IDENTITY_HASH_MAP:
+        return readIdentityHashMap(in);
+      case HASH_TABLE:
+        return readHashtable(in);
+      case CONCURRENT_HASH_MAP:
+        return readConcurrentHashMap(in);
+      case PROPERTIES:
+        return readProperties(in);
+      case TIME_UNIT:
+        return readTimeUnit(in);
+      case USER_CLASS:
+        return readUserObject(in, in.readByte());
+      case USER_CLASS_2:
+        return readUserObject(in, in.readShort());
+      case USER_CLASS_4:
+        return readUserObject(in, in.readInt());
+      case VECTOR:
+        return readVector(in);
+      case STACK:
+        return readStack(in);
+      case TREE_MAP:
+        return readTreeMap(in);
+      case TREE_SET:
+        return readTreeSet(in);
+      case BOOLEAN_TYPE:
+        return Boolean.TYPE;
+      case CHARACTER_TYPE:
+        return Character.TYPE;
+      case BYTE_TYPE:
+        return Byte.TYPE;
+      case SHORT_TYPE:
+        return Short.TYPE;
+      case INTEGER_TYPE:
+        return Integer.TYPE;
+      case LONG_TYPE:
+        return Long.TYPE;
+      case FLOAT_TYPE:
+        return Float.TYPE;
+      case DOUBLE_TYPE:
+        return Double.TYPE;
+      case VOID_TYPE:
+        return Void.TYPE;
+      case USER_DATA_SERIALIZABLE:
+        return readUserDataSerializable(in, in.readByte());
+      case USER_DATA_SERIALIZABLE_2:
+        return readUserDataSerializable(in, in.readShort());
+      case USER_DATA_SERIALIZABLE_4:
+        return readUserDataSerializable(in, in.readInt());
+      case DATA_SERIALIZABLE:
+        return readDataSerializable(in);
+      case SERIALIZABLE:
+        return readSerializable(in);
+      case PDX:
+        return readPdxSerializable(in);
+      case PDX_ENUM:
+        return readPdxEnum(in);
+      case GEMFIRE_ENUM:
+        return readGemFireEnum(in);
+      case PDX_INLINE_ENUM:
+        return readPdxInlineEnum(in);
+      case BIG_INTEGER:
+        return readBigInteger(in);
+      case BIG_DECIMAL:
+        return readBigDecimal(in);
+      case UUID:
+        return readUUID(in);
+      case TIMESTAMP:
+        return readTimestamp(in);
+      default:
+        throw new IOException("Unknown header byte: " + header);
     }
-    if (header == DSCODE.SERIALIZABLE.toByte()) {
-      final boolean isDebugEnabled_SERIALIZER = logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE);
-      Object serializableResult;
-      if (in instanceof DSObjectInputStream) {
-        serializableResult = ((DSObjectInputStream) in).readObject();
+  }
+
+  private static Serializable readSerializable(DataInput in)
+      throws IOException, ClassNotFoundException {
+    final boolean isDebugEnabled_SERIALIZER = logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE);
+    Serializable serializableResult;
+    if (in instanceof DSObjectInputStream) {
+      serializableResult = (Serializable) ((DSObjectInputStream) in).readObject();
+    } else {
+      InputStream stream;
+      if (in instanceof InputStream) {
+        stream = (InputStream) in;
       } else {
-        InputStream stream;
-        if (in instanceof InputStream) {
-          stream = (InputStream) in;
-        } else {
-          stream = new InputStream() {
-            @Override
-            public int read() throws IOException {
-              try {
-                return in.readUnsignedByte(); // fix for bug 47249
-              } catch (EOFException ignored) {
-                return -1;
-              }
+        stream = new InputStream() {
+          @Override
+          public int read() throws IOException {
+            try {
+              return in.readUnsignedByte(); // fix for bug 47249
+            } catch (EOFException ignored) {
+              return -1;
             }
-
-          };
-        }
-
-        ObjectInput ois = new DSObjectInputStream(stream);
-        serializationFilter.setFilterOn((ObjectInputStream) ois);
-        if (stream instanceof VersionedDataStream) {
-          Version v = ((VersionedDataStream) stream).getVersion();
-          if (v != null && v != Version.CURRENT) {
-            ois = new VersionedObjectInput(ois, v);
           }
-        }
 
-        serializableResult = ois.readObject();
+        };
+      }
 
-        if (isDebugEnabled_SERIALIZER) {
-          logger.trace(LogMarker.SERIALIZER_VERBOSE, "Read Serializable object: {}",
-              serializableResult);
+      ObjectInput ois = new DSObjectInputStream(stream);
+      serializationFilter.setFilterOn((ObjectInputStream) ois);
+      if (stream instanceof VersionedDataStream) {
+        Version v = ((VersionedDataStream) stream).getVersion();
+        if (Version.CURRENT != v && v != null) {
+          ois = new VersionedObjectInput(ois, v);
         }
       }
+
+      serializableResult = (Serializable) ois.readObject();
+
       if (isDebugEnabled_SERIALIZER) {
-        logger.trace(LogMarker.SERIALIZER_VERBOSE, "deserialized instanceof {}",
-            serializableResult.getClass());
+        logger.trace(LogMarker.SERIALIZER_VERBOSE, "Read Serializable object: {}",
+            serializableResult);
       }
-      return serializableResult;
     }
-    if (header == DSCODE.PDX.toByte()) {
-      return readPdxSerializable(in);
-    }
-    if (header == DSCODE.PDX_ENUM.toByte()) {
-      return readPdxEnum(in);
-    }
-    if (header == DSCODE.GEMFIRE_ENUM.toByte()) {
-      return readGemFireEnum(in);
-    }
-    if (header == DSCODE.PDX_INLINE_ENUM.toByte()) {
-      return readPdxInlineEnum(in);
-    }
-    if (header == DSCODE.BIG_INTEGER.toByte()) {
-      return readBigInteger(in);
-    }
-    if (header == DSCODE.BIG_DECIMAL.toByte()) {
-      return readBigDecimal(in);
-    }
-    if (header == DSCODE.UUID.toByte()) {
-      return readUUID(in);
-    }
-    if (header == DSCODE.TIMESTAMP.toByte()) {
-      return readTimestamp(in);
+    if (isDebugEnabled_SERIALIZER) {
+      logger.trace(LogMarker.SERIALIZER_VERBOSE, "deserialized instanceof {}",
+          serializableResult.getClass());
     }
-
-    String s = "Unknown header byte: " + header;
-    throw new IOException(s);
+    return serializableResult;
   }
 
   private static Object readUserDataSerializable(final DataInput in, int classId)
@@ -3123,23 +2947,17 @@ public abstract class InternalDataSerializer extends DataSerializer {
     }
   }
 
-  private static final ThreadLocal<Boolean> pdxSerializationInProgress = new ThreadLocal<>();
-
   public static boolean isPdxSerializationInProgress() {
     Boolean v = pdxSerializationInProgress.get();
     return v != null && v;
   }
 
-  public static void setPdxSerializationInProgress(boolean v) {
-    if (v) {
-      pdxSerializationInProgress.set(true);
-    } else {
-      pdxSerializationInProgress.set(false);
-    }
+  public static void setPdxSerializationInProgress(boolean inProgress) {
+    pdxSerializationInProgress.set(inProgress);
   }
 
   public static boolean writePdx(DataOutput out, InternalCache internalCache, Object pdx,
-      PdxSerializer pdxSerializer) throws IOException {
+                                 PdxSerializer pdxSerializer) throws IOException {
     TypeRegistry tr = null;
     if (internalCache != null) {
       tr = internalCache.getPdxRegistry();
@@ -3327,63 +3145,371 @@ public abstract class InternalDataSerializer extends DataSerializer {
     return supportedClassesToHolders;
   }
 
-  /**
-   * A marker object for {@code DataSerializer}s that have not been registered. Using this marker
-   * object allows us to asynchronously send {@code DataSerializer} registration updates. If the
-   * serialized bytes arrive at a VM before the registration message does, the deserializer will
-   * wait an amount of time for the registration message to arrive.
-   */
-  abstract static class Marker {
-    /**
-     * The DataSerializer that is filled in upon registration
-     */
-    protected DataSerializer serializer = null;
-
-    /**
-     * set to true once setSerializer is called.
-     */
-    boolean hasBeenSet = false;
-
-    abstract DataSerializer getSerializer();
-
-    /**
-     * Sets the serializer associated with this marker. It will notify any threads that are waiting
-     * for the serializer to be registered.
-     */
-    void setSerializer(DataSerializer serializer) {
-      synchronized (this) {
-        this.hasBeenSet = true;
-        this.serializer = serializer;
-        this.notifyAll();
+  public static void writeObjectArray(Object[] array, DataOutput out, boolean ensureCompatibility)
+      throws IOException {
+    InternalDataSerializer.checkOut(out);
+    int length = -1;
+    if (array != null) {
+      length = array.length;
+    }
+    InternalDataSerializer.writeArrayLength(length, out);
+    if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
+      logger.trace(LogMarker.SERIALIZER_VERBOSE, "Writing Object array of length {}", length);
+    }
+    if (length >= 0) {
+      writeClass(array.getClass().getComponentType(), out);
+      for (int i = 0; i < length; i++) {
+        basicWriteObject(array[i], out, ensureCompatibility);
       }
     }
   }
 
   /**
-   * A marker object for {@code DataSerializer}s that have not been registered. Using this marker
-   * object allows us to asynchronously send {@code DataSerializer} registration updates. If the
-   * serialized bytes arrive at a VM before the registration message does, the deserializer will
-   * wait an amount of time for the registration message to arrive. Made public for unit test
-   * access.
-   *
-   * @since GemFire 5.7
+   * Write a variable length long the old way (pre 7.0). Use this only in contexts where you might
+   * need to communicate with pre 7.0 members or files.
    */
-  public static class GetMarker extends Marker {
-    /**
-     * Number of milliseconds to wait. Also used by InternalInstantiator. Note that some tests set
-     * this to a small amount to speed up failures. Made public for unit test access.
-     */
-    public static int WAIT_MS = Integer.getInteger(
-        DistributionConfig.GEMFIRE_PREFIX + "InternalDataSerializer.WAIT_MS", 60 * 1000);
+  public static void writeVLOld(long data, DataOutput out) throws IOException {
+    if (data < 0) {
+      Assert.fail("Data expected to be >=0 is " + data);
+    }
+    if (data <= MAX_BYTE_VL) {
+      out.writeByte((byte) data);
+    } else if (data <= 0x7FFF) {
+      // set the sign bit to indicate a short
+      out.write(((int) data >>> 8 | 0x80) & 0xFF);
+      out.write((int) data >>> 0 & 0xFF);
+    } else if (data <= Integer.MAX_VALUE) {
+      out.writeByte(INT_VL);
+      out.writeInt((int) data);
+    } else {
+      out.writeByte(LONG_VL);
+      out.writeLong(data);
+    }
+  }
 
-    /**
-     * Returns the serializer associated with this marker. If the serializer has not been registered
-     * yet, then this method will wait until the serializer is registered. If this method has to
-     * wait for too long, then {@code null} is returned.
-     */
-    @Override
-    DataSerializer getSerializer() {
-      synchronized (this) {
+  /**
+   * Write a variable length long the old way (pre 7.0). Use this only in contexts where you might
+   * need to communicate with pre 7.0 members or files.
+   */
+  public static long readVLOld(DataInput in) throws IOException {
+    byte code = in.readByte();
+    long result;
+    if (code < 0) {
+      // mask off sign bit
+      result = code & 0x7F;
+      result <<= 8;
+      result |= in.readByte() & 0xFF;
+    } else if (code <= MAX_BYTE_VL) {
+      result = code;
+    } else if (code == INT_VL) {
+      result = in.readInt();
+    } else if (code == LONG_VL) {
+      result = in.readLong();
+    } else {
+      throw new IllegalStateException("unexpected variable length code=" + code);
+    }
+    return result;
+  }
+
+  /**
+   * Encode a long as a variable length array.
+   *
+   * This method is appropriate for unsigned integers. For signed integers, negative values will
+   * always consume 10 bytes, so it is recommended to use writeSignedVL instead.
+   *
+   * This is taken from the varint encoding in protobufs (BSD licensed). See
+   * https://developers.google.com/protocol-buffers/docs/encoding
+   */
+  public static void writeUnsignedVL(long data, DataOutput out) throws IOException {
+    while (true) {
+      if ((data & ~0x7FL) == 0) {
+        out.writeByte((int) data);
+        return;
+      } else {
+        out.writeByte((int) data & 0x7F | 0x80);
+        data >>>= 7;
+      }
+    }
+  }
+
+  /**
+   * Decode a long as a variable length array.
+   *
+   * This is taken from the varint encoding in protobufs (BSD licensed). See
+   * https://developers.google.com/protocol-buffers/docs/encoding
+   */
+  public static long readUnsignedVL(DataInput in) throws IOException {
+    int shift = 0;
+    long result = 0;
+    while (shift < 64) {
+      final byte b = in.readByte();
+      result |= (long) (b & 0x7F) << shift;
+      if ((b & 0x80) == 0) {
+        return result;
+      }
+      shift += 7;
+    }
+    throw new GemFireIOException("Malformed variable length integer");
+  }
+
+  /**
+   * Encode a signed long as a variable length array.
+   *
+   * This method is appropriate for signed integers. It uses zig zag encoding to so that negative
+   * numbers will be represented more compactly. For unsigned values, writeUnsignedVL will be more
+   * efficient.
+   */
+  public static void writeSignedVL(long data, DataOutput out) throws IOException {
+    writeUnsignedVL(encodeZigZag64(data), out);
+  }
+
+  /**
+   * Decode a signed long as a variable length array.
+   *
+   * This method is appropriate for signed integers. It uses zig zag encoding to so that negative
+   * numbers will be represented more compactly. For unsigned values, writeUnsignedVL will be more
+   * efficient.
+   */
+  public static long readSignedVL(DataInput in) throws IOException {
+    return decodeZigZag64(readUnsignedVL(in));
+  }
+
+  /**
+   * Decode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers into values that can be
+   * efficiently encoded with varint. (Otherwise, negative values must be sign-extended to 64 bits
+   * to be varint encoded, thus always taking 10 bytes on the wire.)
+   * @param n An unsigned 64-bit integer, stored in a signed int because Java has no explicit
+   * unsigned support.
+   * @return A signed 64-bit integer.
+   *
+   * This is taken from the varint encoding in protobufs (BSD licensed). See
+   * https://developers.google.com/protocol-buffers/docs/encoding
+   */
+  private static long decodeZigZag64(final long n) {
+    return n >>> 1 ^ -(n & 1);
+  }
+
+  /**
+   * Encode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers into values that can be
+   * efficiently encoded with varint. (Otherwise, negative values must be sign-extended to 64 bits
+   * to be varint encoded, thus always taking 10 bytes on the wire.)
+   * @param n A signed 64-bit integer.
+   * @return An unsigned 64-bit integer, stored in a signed int because Java has no explicit
+   * unsigned support.
+   *
+   * This is taken from the varint encoding in protobufs (BSD licensed). See
+   * https://developers.google.com/protocol-buffers/docs/encoding
+   */
+  private static long encodeZigZag64(final long n) {
+    // Note: the right-shift must be arithmetic
+    return n << 1 ^ n >> 63;
+  }
+
+  /* test only method */
+  public static int calculateBytesForTSandDSID(int dsid) {
+    HeapDataOutputStream out = new HeapDataOutputStream(4 + 8, Version.CURRENT);
+    long now = System.currentTimeMillis();
+    try {
+      writeUnsignedVL(now, out);
+      writeUnsignedVL(InternalDataSerializer.encodeZigZag64(dsid), out);
+    } catch (IOException ignored) {
+      return 0;
+    }
+    return out.size();
+  }
+
+  public static Class<?> getCachedClass(String p_className) throws ClassNotFoundException {
+    String className = processIncomingClassName(p_className);
+    if (LOAD_CLASS_EACH_TIME) {
+      return ClassPathLoader.getLatest().forName(className);
+    } else {
+      Class<?> result = getExistingCachedClass(className);
+      if (result == null) {
+        // Do the forName call outside the sync to fix bug 46172
+        result = ClassPathLoader.getLatest().forName(className);
+        synchronized (cacheAccessLock) {
+          Class<?> cachedClass = getExistingCachedClass(className);
+          if (cachedClass == null) {
+            classCache.put(className, new WeakReference<>(result));
+          } else {
+            result = cachedClass;
+          }
+        }
+      }
+      return result;
+    }
+  }
+
+  private static Class<?> getExistingCachedClass(String className) {
+    WeakReference<Class<?>> wr = classCache.get(className);
+    Class<?> result = null;
+    if (wr != null) {
+      result = wr.get();
+    }
+    return result;
+  }
+
+  public static void flushClassCache() {
+    if (classCache != null) {
+      // Not locking classCache during clear as doing so causes a deadlock in the DeployedJar
+      classCache.clear();
+    }
+  }
+
+  /**
+   * Serializes a list of Integers. The argument may be null. Deserialize with
+   * readListOfIntegers().
+   *
+   * TODO: writeListOfIntegers is unused
+   */
+  public void writeListOfIntegers(List<Integer> list, DataOutput out) throws IOException {
+    int size = -1;
+    if (list != null) {
+      size = list.size();
+    }
+    InternalDataSerializer.writeArrayLength(size, out);
+    for (int i = 0; i < size; i++) {
+      out.writeInt(list.get(i));
+    }
+  }
+
+  /**
+   * Any time new serialization format is added then a new enum needs to be added here.
+   * @since GemFire 6.6.2
+   */
+  private enum SERIALIZATION_VERSION {
+    vINVALID,
+    // includes 6.6.0.x and 6.6.1.x. Note that no serialization changes were made in 6.6 until 6.6.2
+    v660,
+    // 6.6.2.x or later NOTE if you add a new constant make sure and update "latestVersion".
+    v662
+  }
+
+  /**
+   * A listener whose listener methods are invoked when {@link DataSerializer}s and {@link
+   * Instantiator}s are registered. This is part of the fix for bug 31422.
+   * @see InternalDataSerializer#addRegistrationListener
+   * @see InternalDataSerializer#removeRegistrationListener
+   */
+  public interface RegistrationListener {
+
+    /**
+     * Invoked when a new {@code Instantiator} is {@linkplain Instantiator#register(Instantiator)
+     * registered}.
+     */
+    void newInstantiator(Instantiator instantiator);
+
+    /**
+     * Invoked when a new {@code DataSerializer} is {@linkplain DataSerializer#register(Class)
+     * registered}.
+     */
+    void newDataSerializer(DataSerializer ds);
+  }
+
+  /**
+   * A SerializerAttributesHolder holds information required to load a DataSerializer and exists to
+   * allow client/server connections to be created more quickly than they would if the
+   * DataSerializer information downloaded from the server were used to immediately load the
+   * corresponding classes.
+   */
+  public static class SerializerAttributesHolder {
+    private String className = "";
+    private EventID eventId = null;
+    private ClientProxyMembershipID proxyId = null;
+    private int id = 0;
+
+    SerializerAttributesHolder() {
+    }
+
+    SerializerAttributesHolder(String name, EventID event, ClientProxyMembershipID proxy, int id) {
+      this.className = name;
+      this.eventId = event;
+      this.proxyId = proxy;
+      this.id = id;
+    }
+
+    /**
+     * @return String the classname of the data serializer this instance represents.
+     */
+    public String getClassName() {
+      return this.className;
+    }
+
+    public EventID getEventId() {
+      return this.eventId;
+    }
+
+    public ClientProxyMembershipID getProxyId() {
+      return this.proxyId;
+    }
+
+    public int getId() {
+      return this.id;
+    }
+
+    @Override
+    public String toString() {
+      return "SerializerAttributesHolder[name=" + this.className + ",id=" + this.id + ",eventId="
+          + this.eventId + ']';
+    }
+  }
+
+  /**
+   * A marker object for {@code DataSerializer}s that have not been registered. Using this marker
+   * object allows us to asynchronously send {@code DataSerializer} registration updates. If the
+   * serialized bytes arrive at a VM before the registration message does, the deserializer will
+   * wait an amount of time for the registration message to arrive.
+   */
+  abstract static class Marker {
+    /**
+     * The DataSerializer that is filled in upon registration
+     */
+    protected DataSerializer serializer = null;
+
+    /**
+     * set to true once setSerializer is called.
+     */
+    boolean hasBeenSet = false;
+
+    abstract DataSerializer getSerializer();
+
+    /**
+     * Sets the serializer associated with this marker. It will notify any threads that are waiting
+     * for the serializer to be registered.
+     */
+    void setSerializer(DataSerializer serializer) {
+      synchronized (this) {
+        this.hasBeenSet = true;
+        this.serializer = serializer;
+        this.notifyAll();
+      }
+    }
+  }
+
+  /**
+   * A marker object for {@code DataSerializer}s that have not been registered. Using this marker
+   * object allows us to asynchronously send {@code DataSerializer} registration updates. If the
+   * serialized bytes arrive at a VM before the registration message does, the deserializer will
+   * wait an amount of time for the registration message to arrive. Made public for unit test
+   * access.
+   * @since GemFire 5.7
+   */
+  public static class GetMarker extends Marker {
+    /**
+     * Number of milliseconds to wait. Also used by InternalInstantiator. Note that some tests set
+     * this to a small amount to speed up failures. Made public for unit test access.
+     */
+    public static int WAIT_MS = Integer.getInteger(
+        DistributionConfig.GEMFIRE_PREFIX + "InternalDataSerializer.WAIT_MS", 60 * 1000);
+
+    /**
+     * Returns the serializer associated with this marker. If the serializer has not been registered
+     * yet, then this method will wait until the serializer is registered. If this method has to
+     * wait for too long, then {@code null} is returned.
+     */
+    @Override
+    DataSerializer getSerializer() {
+      synchronized (this) {
         boolean firstTime = true;
         long endTime = 0;
         while (!this.hasBeenSet) {
@@ -3415,7 +3541,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   /**
    * A marker object for {@code DataSerializer}s that is in the process of being registered. It is
    * possible for getSerializer to return {@code null}
-   *
    * @since GemFire 5.7
    */
   static class InitMarker extends Marker {
@@ -3452,29 +3577,27 @@ public abstract class InternalDataSerializer extends DataSerializer {
    */
   public static class RegistrationMessage extends SerialDistributionMessage {
     /**
-     * The id of the {@code DataSerializer} that was registered since 5.7 an int instead of a byte
+     * The versions in which this message was modified
      */
-    private int id;
-
+    private static final Version[] dsfidVersions = new Version[]{};
     /**
      * The eventId of the {@code DataSerializer} that was registered
      */
-    protected EventID eventId;
-
+    protected EventID eventId;
+    /**
+     * The id of the {@code DataSerializer} that was registered since 5.7 an int instead of a byte
+     */
+    private int id;
     /**
      * The name of the {@code DataSerializer} class
      */
     private String className;
 
     /**
-     * The versions in which this message was modified
-     */
-    private static final Version[] dsfidVersions = new Version[] {};
-
-    /**
      * Constructor for {@code DataSerializable}
      */
-    public RegistrationMessage() {}
+    public RegistrationMessage() {
+    }
 
     /**
      * Creates a new {@code RegistrationMessage} that broadcasts that the given {@code
@@ -3592,28 +3715,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   }
 
   /**
-   * A listener whose listener methods are invoked when {@link DataSerializer}s and {@link
-   * Instantiator}s are registered. This is part of the fix for bug 31422.
-   *
-   * @see InternalDataSerializer#addRegistrationListener
-   * @see InternalDataSerializer#removeRegistrationListener
-   */
-  public interface RegistrationListener {
-
-    /**
-     * Invoked when a new {@code Instantiator} is {@linkplain Instantiator#register(Instantiator)
-     * registered}.
-     */
-    void newInstantiator(Instantiator instantiator);
-
-    /**
-     * Invoked when a new {@code DataSerializer} is {@linkplain DataSerializer#register(Class)
-     * registered}.
-     */
-    void newDataSerializer(DataSerializer ds);
-  }
-
-  /**
    * An {@code ObjectInputStream} whose {@link #resolveClass} method loads classes from the current
    * context class loader.
    */
@@ -3685,7 +3786,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * Used to implement serialization code for the well known classes we support in DataSerializer.
-   *
    * @since GemFire 5.7
    */
   protected abstract static class WellKnownDS extends DataSerializer {
@@ -3716,235 +3816,4 @@ public abstract class InternalDataSerializer extends DataSerializer {
   protected abstract static class WellKnownPdxDS extends WellKnownDS {
     // subclasses need to implement toData
   }
-
-  public static void writeObjectArray(Object[] array, DataOutput out, boolean ensureCompatibility)
-      throws IOException {
-    InternalDataSerializer.checkOut(out);
-    int length;
-    if (array == null) {
-      length = -1;
-    } else {
-      length = array.length;
-    }
-    InternalDataSerializer.writeArrayLength(length, out);
-    if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
-      logger.trace(LogMarker.SERIALIZER_VERBOSE, "Writing Object array of length {}", length);
-    }
-    if (length >= 0) {
-      writeClass(array.getClass().getComponentType(), out);
-      for (int i = 0; i < length; i++) {
-        basicWriteObject(array[i], out, ensureCompatibility);
-      }
-    }
-  }
-
-  // Variable Length long encoded as int in next 4 bytes
-  private static final byte INT_VL = 126;
-
-  // Variable Length long encoded as long in next 8 bytes
-  private static final byte LONG_VL = 127;
-
-  private static final int MAX_BYTE_VL = 125;
-
-  /**
-   * Write a variable length long the old way (pre 7.0). Use this only in contexts where you might
-   * need to communicate with pre 7.0 members or files.
-   */
-  public static void writeVLOld(long data, DataOutput out) throws IOException {
-    if (data < 0) {
-      Assert.fail("Data expected to be >=0 is " + data);
-    }
-    if (data <= MAX_BYTE_VL) {
-      out.writeByte((byte) data);
-    } else if (data <= 0x7FFF) {
-      // set the sign bit to indicate a short
-      out.write(((int) data >>> 8 | 0x80) & 0xFF);
-      out.write((int) data >>> 0 & 0xFF);
-    } else if (data <= Integer.MAX_VALUE) {
-      out.writeByte(INT_VL);
-      out.writeInt((int) data);
-    } else {
-      out.writeByte(LONG_VL);
-      out.writeLong(data);
-    }
-  }
-
-  /**
-   * Write a variable length long the old way (pre 7.0). Use this only in contexts where you might
-   * need to communicate with pre 7.0 members or files.
-   */
-  public static long readVLOld(DataInput in) throws IOException {
-    byte code = in.readByte();
-    long result;
-    if (code < 0) {
-      // mask off sign bit
-      result = code & 0x7F;
-      result <<= 8;
-      result |= in.readByte() & 0xFF;
-    } else if (code <= MAX_BYTE_VL) {
-      result = code;
-    } else if (code == INT_VL) {
-      result = in.readInt();
-    } else if (code == LONG_VL) {
-      result = in.readLong();
-    } else {
-      throw new IllegalStateException("unexpected variable length code=" + code);
-    }
-    return result;
-  }
-
-  /**
-   * Encode a long as a variable length array.
-   *
-   * This method is appropriate for unsigned integers. For signed integers, negative values will
-   * always consume 10 bytes, so it is recommended to use writeSignedVL instead.
-   *
-   * This is taken from the varint encoding in protobufs (BSD licensed). See
-   * https://developers.google.com/protocol-buffers/docs/encoding
-   */
-  public static void writeUnsignedVL(long data, DataOutput out) throws IOException {
-    while (true) {
-      if ((data & ~0x7FL) == 0) {
-        out.writeByte((int) data);
-        return;
-      } else {
-        out.writeByte((int) data & 0x7F | 0x80);
-        data >>>= 7;
-      }
-    }
-  }
-
-  /**
-   * Decode a long as a variable length array.
-   *
-   * This is taken from the varint encoding in protobufs (BSD licensed). See
-   * https://developers.google.com/protocol-buffers/docs/encoding
-   */
-  public static long readUnsignedVL(DataInput in) throws IOException {
-    int shift = 0;
-    long result = 0;
-    while (shift < 64) {
-      final byte b = in.readByte();
-      result |= (long) (b & 0x7F) << shift;
-      if ((b & 0x80) == 0) {
-        return result;
-      }
-      shift += 7;
-    }
-    throw new GemFireIOException("Malformed variable length integer");
-  }
-
-  /**
-   * Encode a signed long as a variable length array.
-   *
-   * This method is appropriate for signed integers. It uses zig zag encoding to so that negative
-   * numbers will be represented more compactly. For unsigned values, writeUnsignedVL will be more
-   * efficient.
-   */
-  public static void writeSignedVL(long data, DataOutput out) throws IOException {
-    writeUnsignedVL(encodeZigZag64(data), out);
-  }
-
-  /**
-   * Decode a signed long as a variable length array.
-   *
-   * This method is appropriate for signed integers. It uses zig zag encoding to so that negative
-   * numbers will be represented more compactly. For unsigned values, writeUnsignedVL will be more
-   * efficient.
-   */
-  public static long readSignedVL(DataInput in) throws IOException {
-    return decodeZigZag64(readUnsignedVL(in));
-  }
-
-  /**
-   * Decode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers into values that can be
-   * efficiently encoded with varint. (Otherwise, negative values must be sign-extended to 64 bits
-   * to be varint encoded, thus always taking 10 bytes on the wire.)
-   *
-   * @param n An unsigned 64-bit integer, stored in a signed int because Java has no explicit
-   *        unsigned support.
-   * @return A signed 64-bit integer.
-   *
-   *         This is taken from the varint encoding in protobufs (BSD licensed). See
-   *         https://developers.google.com/protocol-buffers/docs/encoding
-   */
-  private static long decodeZigZag64(final long n) {
-    return n >>> 1 ^ -(n & 1);
-  }
-
-  /**
-   * Encode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers into values that can be
-   * efficiently encoded with varint. (Otherwise, negative values must be sign-extended to 64 bits
-   * to be varint encoded, thus always taking 10 bytes on the wire.)
-   *
-   * @param n A signed 64-bit integer.
-   * @return An unsigned 64-bit integer, stored in a signed int because Java has no explicit
-   *         unsigned support.
-   *
-   *         This is taken from the varint encoding in protobufs (BSD licensed). See
-   *         https://developers.google.com/protocol-buffers/docs/encoding
-   */
-  private static long encodeZigZag64(final long n) {
-    // Note: the right-shift must be arithmetic
-    return n << 1 ^ n >> 63;
-  }
-
-  /* test only method */
-  public static int calculateBytesForTSandDSID(int dsid) {
-    HeapDataOutputStream out = new HeapDataOutputStream(4 + 8, Version.CURRENT);
-    long now = System.currentTimeMillis();
-    try {
-      writeUnsignedVL(now, out);
-      writeUnsignedVL(InternalDataSerializer.encodeZigZag64(dsid), out);
-    } catch (IOException ignored) {
-      return 0;
-    }
-    return out.size();
-  }
-
-  public static final boolean LOAD_CLASS_EACH_TIME =
-      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "loadClassOnEveryDeserialization");
-
-  private static final CopyOnWriteHashMap<String, WeakReference<Class<?>>> classCache =
-      LOAD_CLASS_EACH_TIME ? null : new CopyOnWriteHashMap<>();
-
-  private static final Object cacheAccessLock = new Object();
-
-  public static Class<?> getCachedClass(String p_className) throws ClassNotFoundException {
-    String className = processIncomingClassName(p_className);
-    if (LOAD_CLASS_EACH_TIME) {
-      return ClassPathLoader.getLatest().forName(className);
-    } else {
-      Class<?> result = getExistingCachedClass(className);
-      if (result == null) {
-        // Do the forName call outside the sync to fix bug 46172
-        result = ClassPathLoader.getLatest().forName(className);
-        synchronized (cacheAccessLock) {
-          Class<?> cachedClass = getExistingCachedClass(className);
-          if (cachedClass == null) {
-            classCache.put(className, new WeakReference<>(result));
-          } else {
-            result = cachedClass;
-          }
-        }
-      }
-      return result;
-    }
-  }
-
-  private static Class<?> getExistingCachedClass(String className) {
-    WeakReference<Class<?>> wr = classCache.get(className);
-    Class<?> result = null;
-    if (wr != null) {
-      result = wr.get();
-    }
-    return result;
-  }
-
-  public static void flushClassCache() {
-    if (classCache != null) {
-      // Not locking classCache during clear as doing so causes a deadlock in the DeployedJar
-      classCache.clear();
-    }
-  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/DscodeHelper.java b/geode-core/src/main/java/org/apache/geode/internal/util/DscodeHelper.java
new file mode 100644
index 0000000..57d7ae2
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/util/DscodeHelper.java
@@ -0,0 +1,19 @@
+package org.apache.geode.internal.util;
+
+import java.util.Arrays;
+
+import org.apache.geode.internal.DSCODE;
+
+public class DscodeHelper {
+
+  private static final DSCODE[] dscodes = new DSCODE[128];
+
+  static {
+    Arrays.stream(DSCODE.values()).filter(dscode -> dscode.toByte() >= 0)
+        .forEach(dscode -> dscodes[dscode.toByte()] = dscode);
+  }
+
+  public static DSCODE toDSCODE(final byte value) {
+    return dscodes[value];
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/DSCODETest.java b/geode-core/src/test/java/org/apache/geode/internal/DSCODETest.java
index 7bc7684..d4007bb 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/DSCODETest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/DSCODETest.java
@@ -16,12 +16,15 @@ package org.apache.geode.internal;
 
 import static org.junit.Assert.assertFalse;
 
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.internal.util.DscodeHelper;
 import org.apache.geode.test.junit.categories.SerializationTest;
 
 @Category({SerializationTest.class})
@@ -36,4 +39,11 @@ public class DSCODETest {
       previouslySeen.add(integerValue);
     }
   }
+
+  @Test
+  public void testGetEnumFromByte() {
+    Arrays.stream(DSCODE.values())
+        .filter(dscode -> dscode != DSCODE.RESERVED_FOR_FUTURE_USE && dscode != DSCODE.ILLEGAL)
+        .forEach(dscode -> Assert.assertEquals(dscode, DscodeHelper.toDSCODE(dscode.toByte())));
+  }
 }