You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2018/09/07 23:52:33 UTC

[geode] branch feature/GEODE-5705 updated (c7878ff -> 4214a60)

This is an automated email from the ASF dual-hosted git repository.

udo pushed a change to branch feature/GEODE-5705
in repository https://gitbox.apache.org/repos/asf/geode.git.


    omit c7878ff  GEODE-5705: fixed logic to read Strings avoid double switch statements. Cleaned up more locations where switch statements could have been used.
    omit 4b41453  GEODE-5705: Fix test to not test Illegal DSCodes
    omit eeb70cd  GEODE-5705: Improve basicReadObject to use switch statement rather than if statements.
     new 4214a60  GEODE-5705: Improve basicReadObject to use switch statement rather than if statements.

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (c7878ff)
            \
             N -- N -- N   refs/heads/feature/GEODE-5705 (4214a60)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../geode/internal/InternalDataSerializer.java     | 120 +++++++--------------
 1 file changed, 37 insertions(+), 83 deletions(-)


[geode] 01/01: GEODE-5705: Improve basicReadObject to use switch statement rather than if statements.

Posted by ud...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch feature/GEODE-5705
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 4214a601bcc273f8950dfa5c5e9543be5cef8e4c
Author: Udo Kohlmeyer <uk...@pivotal.io>
AuthorDate: Thu Sep 6 17:14:03 2018 -0700

    GEODE-5705: Improve basicReadObject to use switch statement rather
    than if statements.
---
 .../geode/internal/InternalDataSerializer.java     | 1671 +++++++++-----------
 .../apache/geode/internal/util/DscodeHelper.java   |   19 +
 .../java/org/apache/geode/internal/DSCODETest.java |   10 +
 3 files changed, 799 insertions(+), 901 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
index 72a1bc0..fc3722a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
@@ -106,6 +106,7 @@ import org.apache.geode.internal.lang.ClassUtils;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.util.DscodeHelper;
 import org.apache.geode.internal.util.concurrent.CopyOnWriteHashMap;
 import org.apache.geode.pdx.NonPortableClassException;
 import org.apache.geode.pdx.PdxInstance;
@@ -127,19 +128,24 @@ import org.apache.geode.pdx.internal.TypeRegistry;
  * Contains static methods for data serializing instances of internal GemFire classes. It also
  * contains the implementation of the distribution messaging (and shared memory management) needed
  * to support data serialization.
- *
  * @since GemFire 3.5
  */
 public abstract class InternalDataSerializer extends DataSerializer {
+  // array is null
+  public static final byte NULL_ARRAY = -1;
+  /**
+   * array len encoded as int in next 4 bytes
+   * @since GemFire 5.7
+   */
+  public static final byte INT_ARRAY_LEN = -3;
+  public static final boolean LOAD_CLASS_EACH_TIME =
+      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "loadClassOnEveryDeserialization");
   private static final Logger logger = LogService.getLogger();
-
   /**
    * Maps Class names to their DataSerializer. This is used to find a DataSerializer during
    * serialization.
    */
   private static final Map<String, DataSerializer> classesToSerializers = new ConcurrentHashMap<>();
-
-
   /**
    * This list contains classes that Geode's classes subclass, such as antlr AST classes which are
    * used by our Object Query Language. It also contains certain classes that are DataSerializable
@@ -191,27 +197,84 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
           // geode-modules
           + ";org.apache.geode.modules.util.SessionCustomExpiry" + ";";
-
-
+  private static final String serializationVersionTxt =
+      System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "serializationVersion");
+  /**
+   * Change this constant to be the last one in SERIALIZATION_VERSION
+   */
+  private static final SERIALIZATION_VERSION latestVersion = SERIALIZATION_VERSION.v662;
+  private static final SERIALIZATION_VERSION serializationVersion = calculateSerializationVersion();
+  /**
+   * Maps the id of a serializer to its {@code DataSerializer}.
+   */
+  private static final ConcurrentMap/* <Integer, DataSerializer|Marker> */ idsToSerializers =
+      new ConcurrentHashMap();
+  /**
+   * Contains the classnames of the data serializers (and not the supported classes) not yet loaded
+   * into the vm as keys and their corresponding holder instances as values.
+   */
+  private static final ConcurrentHashMap<String, SerializerAttributesHolder> dsClassesToHolders =
+      new ConcurrentHashMap<>();
+  /**
+   * Contains the id of the data serializers not yet loaded into the vm as keys and their
+   * corresponding holder instances as values.
+   */
+  private static final ConcurrentHashMap<Integer, SerializerAttributesHolder> idsToHolders =
+      new ConcurrentHashMap<>();
+  /**
+   * Contains the classnames of supported classes as keys and their corresponding
+   * SerializerAttributesHolder instances as values. This applies only to the data serializers which
+   * have not been loaded into the vm.
+   */
+  private static final ConcurrentHashMap<String, SerializerAttributesHolder>
+      supportedClassesToHolders =
+      new ConcurrentHashMap<>();
+  private static final Object listenersSync = new Object();
+  private static final byte TIME_UNIT_NANOSECONDS = -1;
+  private static final byte TIME_UNIT_MICROSECONDS = -2;
+  private static final byte TIME_UNIT_MILLISECONDS = -3;
+  private static final byte TIME_UNIT_SECONDS = -4;
+  private static final ConcurrentMap dsfidToClassMap =
+      logger.isTraceEnabled(LogMarker.SERIALIZER_WRITE_DSFID_VERBOSE) ? new ConcurrentHashMap()
+          : null;
+  /**
+   * array len encoded as unsigned short in next 2 bytes
+   * @since GemFire 5.7
+   */
+  private static final byte SHORT_ARRAY_LEN = -2;
+  private static final int MAX_BYTE_ARRAY_LEN = (byte) -4 & 0xFF;
+  private static final ThreadLocal<Boolean> pdxSerializationInProgress = new ThreadLocal<>();
+  // Variable Length long encoded as int in next 4 bytes
+  private static final byte INT_VL = 126;
+  // Variable Length long encoded as long in next 8 bytes
+  private static final byte LONG_VL = 127;
+  private static final int MAX_BYTE_VL = 125;
+  private static final CopyOnWriteHashMap<String, WeakReference<Class<?>>> classCache =
+      LOAD_CLASS_EACH_TIME ? null : new CopyOnWriteHashMap<>();
+  private static final Object cacheAccessLock = new Object();
   private static InputStreamFilter defaultSerializationFilter = new EmptyInputStreamFilter();
-
   /**
    * A deserialization filter for ObjectInputStreams
    */
   private static InputStreamFilter serializationFilter = defaultSerializationFilter;
-
-  private static final String serializationVersionTxt =
-      System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "serializationVersion");
-
   /**
    * support for old GemFire clients and WAN sites - needed to enable moving from GemFire to Geode
    */
   private static OldClientSupportService oldClientSupportService;
+  /**
+   * {@code RegistrationListener}s that receive callbacks when {@code DataSerializer}s and {@code
+   * Instantiator}s are registered. Note: copy-on-write access used for this set
+   */
+  private static volatile Set listeners = new HashSet();
+  private static DataSerializer dvddeserializer;
+
+  static {
+    initializeWellKnownSerializers();
+  }
 
   /**
    * For backward compatibility we must swizzle the package of some classes that had to be moved
    * when GemFire was open- sourced. This preserves backward-compatibility.
-   *
    * @param name the fully qualified class name
    * @return the name of the class in this implementation
    */
@@ -233,7 +296,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   /**
    * For backward compatibility we must swizzle the package of some classes that had to be moved
    * when GemFire was open- sourced. This preserves backward-compatibility.
-   *
    * @param name the fully qualified class name
    * @param out the consumer of the serialized object
    * @return the name of the class in this implementation
@@ -256,12 +318,11 @@ public abstract class InternalDataSerializer extends DataSerializer {
   /**
    * Initializes the optional serialization "accept list" if the user has requested it in the
    * DistributionConfig
-   *
    * @param distributionConfig the DistributedSystem configuration
    * @param services DistributedSystem services that might have classes to acceptlist
    */
   public static void initialize(DistributionConfig distributionConfig,
-      Collection<DistributedSystemService> services) {
+                                Collection<DistributedSystemService> services) {
     logger.info("initializing InternalDataSerializer with {} services", services.size());
     if (distributionConfig.getValidateSerializableObjects()) {
       if (!ClassUtils.isClassAvailable("sun.misc.ObjectInputFilter")
@@ -281,11 +342,9 @@ public abstract class InternalDataSerializer extends DataSerializer {
     serializationFilter = defaultSerializationFilter;
   }
 
-
   /**
    * {@link DistributedSystemService}s that need to acceptlist Serializable objects can use this to
-   * read them from a file and then return them via
-   * {@link DistributedSystemService#getSerializationAcceptlist}
+   * read them from a file and then return them via {@link DistributedSystemService#getSerializationAcceptlist}
    */
   public static Collection<String> loadClassNames(URL sanctionedSerializables) throws IOException {
     Collection<String> result = new ArrayList(1000);
@@ -311,25 +370,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   }
 
-
-  /**
-   * Any time new serialization format is added then a new enum needs to be added here.
-   *
-   * @since GemFire 6.6.2
-   */
-  private enum SERIALIZATION_VERSION {
-    vINVALID,
-    // includes 6.6.0.x and 6.6.1.x. Note that no serialization changes were made in 6.6 until 6.6.2
-    v660,
-    // 6.6.2.x or later NOTE if you add a new constant make sure and update "latestVersion".
-    v662
-  }
-
-  /**
-   * Change this constant to be the last one in SERIALIZATION_VERSION
-   */
-  private static final SERIALIZATION_VERSION latestVersion = SERIALIZATION_VERSION.v662;
-
   private static SERIALIZATION_VERSION calculateSerializationVersion() {
     if (serializationVersionTxt == null || serializationVersionTxt.isEmpty()) {
       return latestVersion;
@@ -343,8 +383,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
     }
   }
 
-  private static final SERIALIZATION_VERSION serializationVersion = calculateSerializationVersion();
-
   public static boolean is662SerializationEnabled() {
     return serializationVersion.ordinal() >= SERIALIZATION_VERSION.v662.ordinal();
   }
@@ -358,10 +396,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
     }
   }
 
-  static {
-    initializeWellKnownSerializers();
-  }
-
   private static void initializeWellKnownSerializers() {
     // ArrayBlockingQueue does not have zero-arg constructor
     // LinkedBlockingQueue does have zero-arg constructor but no way to get capacity
@@ -769,42 +803,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   }
 
   /**
-   * Maps the id of a serializer to its {@code DataSerializer}.
-   */
-  private static final ConcurrentMap/* <Integer, DataSerializer|Marker> */ idsToSerializers =
-      new ConcurrentHashMap();
-
-  /**
-   * Contains the classnames of the data serializers (and not the supported classes) not yet loaded
-   * into the vm as keys and their corresponding holder instances as values.
-   */
-  private static final ConcurrentHashMap<String, SerializerAttributesHolder> dsClassesToHolders =
-      new ConcurrentHashMap<>();
-
-  /**
-   * Contains the id of the data serializers not yet loaded into the vm as keys and their
-   * corresponding holder instances as values.
-   */
-  private static final ConcurrentHashMap<Integer, SerializerAttributesHolder> idsToHolders =
-      new ConcurrentHashMap<>();
-
-  /**
-   * Contains the classnames of supported classes as keys and their corresponding
-   * SerializerAttributesHolder instances as values. This applies only to the data serializers which
-   * have not been loaded into the vm.
-   */
-  private static final ConcurrentHashMap<String, SerializerAttributesHolder> supportedClassesToHolders =
-      new ConcurrentHashMap<>();
-
-  /**
-   * {@code RegistrationListener}s that receive callbacks when {@code DataSerializer}s and {@code
-   * Instantiator}s are registered. Note: copy-on-write access used for this set
-   */
-  private static volatile Set listeners = new HashSet();
-
-  private static final Object listenersSync = new Object();
-
-  /**
    * Convert the given unsigned byte to an int. The returned value will be in the range [0..255]
    * inclusive
    */
@@ -812,17 +810,16 @@ public abstract class InternalDataSerializer extends DataSerializer {
     return ub & 0xFF;
   }
 
-  public static void setOldClientSupportService(final OldClientSupportService svc) {
-    oldClientSupportService = svc;
-  }
-
   public static OldClientSupportService getOldClientSupportService() {
     return oldClientSupportService;
   }
 
+  public static void setOldClientSupportService(final OldClientSupportService svc) {
+    oldClientSupportService = svc;
+  }
+
   /**
    * Instantiates an instance of {@code DataSerializer}
-   *
    * @throws IllegalArgumentException If the class can't be instantiated
    * @see DataSerializer#register(Class)
    */
@@ -839,11 +836,11 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
     } catch (NoSuchMethodException ignored) {
       StringId s = LocalizedStrings.DataSerializer_CLASS_0_DOES_NOT_HAVE_A_ZEROARGUMENT_CONSTRUCTOR;
-      Object[] args = new Object[] {c.getName()};
+      Object[] args = new Object[]{c.getName()};
       if (c.getDeclaringClass() != null) {
         s =
             LocalizedStrings.DataSerializer_CLASS_0_DOES_NOT_HAVE_A_ZEROARGUMENT_CONSTRUCTOR_IT_IS_AN_INNER_CLASS_OF_1_SHOULD_IT_BE_A_STATIC_INNER_CLASS;
-        args = new Object[] {c.getName(), c.getDeclaringClass()};
+        args = new Object[]{c.getName(), c.getDeclaringClass()};
       }
       throw new IllegalArgumentException(s.toLocalizedString(args));
     }
@@ -875,7 +872,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
   }
 
   public static DataSerializer register(Class c, boolean distribute, EventID eventId,
-      ClientProxyMembershipID context) {
+                                        ClientProxyMembershipID context) {
     DataSerializer s = newInstance(c);
     // This method is only called when server connection and CacheClientUpdaterThread
     s.setEventId(eventId);
@@ -885,9 +882,8 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * Registers a {@code DataSerializer} instance with the data serialization framework.
-   *
    * @param distribute Should the registered {@code DataSerializer} be distributed to other members
-   *        of the distributed system?
+   * of the distributed system?
    * @see DataSerializer#register(Class)
    */
   public static DataSerializer register(Class c, boolean distribute) {
@@ -945,7 +941,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
           DataSerializer other = (DataSerializer) oldSerializer;
           throw new IllegalStateException(
               LocalizedStrings.InternalDataSerializer_A_DATASERIALIZER_OF_CLASS_0_IS_ALREADY_REGISTERED_WITH_ID_1_SO_THE_DATASERIALIZER_OF_CLASS_2_COULD_NOT_BE_REGISTERED
-                  .toLocalizedString(new Object[] {other.getClass().getName(), other.getId()}));
+                  .toLocalizedString(new Object[]{other.getClass().getName(), other.getId()}));
         }
       }
     } while (retry);
@@ -1018,7 +1014,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   /**
    * Marks a {@code DataSerializer} className for registration with the data serialization framework
    * if and when it is needed. Does not necessarily load the classes into this VM.
-   *
    * @param className Name of the DataSerializer class.
    * @param distribute If true, distribute this data serializer.
    * @param eventId Event id
@@ -1026,7 +1021,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
    * @see DataSerializer#register(Class)
    */
   public static void register(String className, boolean distribute, EventID eventId,
-      ClientProxyMembershipID proxyId, int id) {
+                              ClientProxyMembershipID proxyId, int id) {
     register(className, distribute,
         new SerializerAttributesHolder(className, eventId, proxyId, id));
   }
@@ -1034,7 +1029,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   /**
    * Marks a {@code DataSerializer} className for registration with the data serialization
    * framework. Does not necessarily load the classes into this VM.
-   *
    * @param distribute If true, distribute this data serializer.
    * @see DataSerializer#register(Class)
    */
@@ -1043,7 +1037,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
   }
 
   private static void register(String className, boolean distribute,
-      SerializerAttributesHolder holder) {
+                               SerializerAttributesHolder holder) {
     if (StringUtils.isBlank(className)) {
       throw new IllegalArgumentException("Class name cannot be null or empty.");
     }
@@ -1053,7 +1047,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
       if (oldValue.getId() != 0 && holder.getId() != 0 && oldValue.getId() != holder.getId()) {
         throw new IllegalStateException(
             LocalizedStrings.InternalDataSerializer_A_DATASERIALIZER_OF_CLASS_0_IS_ALREADY_REGISTERED_WITH_ID_1_SO_THE_DATASERIALIZER_OF_CLASS_2_COULD_NOT_BE_REGISTERED
-                .toLocalizedString(new Object[] {oldValue.getClass().getName(), oldValue.getId()}));
+                .toLocalizedString(new Object[]{oldValue.getClass().getName(), oldValue.getId()}));
       }
     }
 
@@ -1076,7 +1070,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
    * classes they support. The DataSerializers are registered as "holders" to avoid loading the
    * actual classes until they're needed. This method registers the names of classes supported by
    * the DataSerializers
-   *
    * @param map The classes returned by DataSerializer.supportedClasses()
    */
   public static void updateSupportedClassesMap(Map<Integer, List<String>> map) {
@@ -1097,53 +1090,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
     }
   }
 
-  /**
-   * A SerializerAttributesHolder holds information required to load a DataSerializer and exists to
-   * allow client/server connections to be created more quickly than they would if the
-   * DataSerializer information downloaded from the server were used to immediately load the
-   * corresponding classes.
-   */
-  public static class SerializerAttributesHolder {
-    private String className = "";
-    private EventID eventId = null;
-    private ClientProxyMembershipID proxyId = null;
-    private int id = 0;
-
-    SerializerAttributesHolder() {}
-
-    SerializerAttributesHolder(String name, EventID event, ClientProxyMembershipID proxy, int id) {
-      this.className = name;
-      this.eventId = event;
-      this.proxyId = proxy;
-      this.id = id;
-    }
-
-    /**
-     * @return String the classname of the data serializer this instance represents.
-     */
-    public String getClassName() {
-      return this.className;
-    }
-
-    public EventID getEventId() {
-      return this.eventId;
-    }
-
-    public ClientProxyMembershipID getProxyId() {
-      return this.proxyId;
-    }
-
-    public int getId() {
-      return this.id;
-    }
-
-    @Override
-    public String toString() {
-      return "SerializerAttributesHolder[name=" + this.className + ",id=" + this.id + ",eventId="
-          + this.eventId + ']';
-    }
-  }
-
   private static void sendRegistrationMessageToServers(DataSerializer dataSerializer) {
     PoolManagerImpl.allPoolsRegisterDataSerializers(dataSerializer);
   }
@@ -1178,7 +1124,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
         new ClientDataSerializerMessage(EnumListenerEvent.AFTER_REGISTER_DATASERIALIZER,
             serializedDataSerializer, (ClientProxyMembershipID) dataSerializer.getContext(),
             (EventID) dataSerializer.getEventId(),
-            new Class[][] {dataSerializer.getSupportedClasses()});
+            new Class[][]{dataSerializer.getSupportedClasses()});
     // Deliver it to all the clients
     CacheClientNotifier.routeClientMessage(clientDataSerializerMessage);
   }
@@ -1353,7 +1299,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   /**
    * Returns all the data serializers in this vm. This method, unlike {@link #getSerializers()},
    * does not force loading of the data serializers which were not loaded in the vm earlier.
-   *
    * @return Array of {@link SerializerAttributesHolder}
    */
   public static SerializerAttributesHolder[] getSerializersForDistribution() {
@@ -1427,7 +1372,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * Read the data from in and register it with this class. TODO: loadRegistrations is unused
-   *
    * @throws IllegalArgumentException if a registration fails
    */
   public static void loadRegistrations(DataInput in) throws IOException {
@@ -1471,7 +1415,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * Alerts all {@code RegistrationListener}s that a new {@code DataSerializer} has been registered
-   *
    * @see InternalDataSerializer.RegistrationListener#newDataSerializer
    */
   private static void fireNewDataSerializer(DataSerializer ds) {
@@ -1483,7 +1426,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * Alerts all {@code RegistrationListener}s that a new {@code Instantiator} has been registered
-   *
    * @see InternalDataSerializer.RegistrationListener#newInstantiator
    */
   static void fireNewInstantiator(Instantiator instantiator) {
@@ -1575,18 +1517,16 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * Data serializes an instance of a well-known class to the given {@code DataOutput}.
-   *
    * @return {@code true} if {@code o} was actually written to {@code out}
    */
   private static boolean writeWellKnownObject(Object o, DataOutput out,
-      boolean ensurePdxCompatibility) throws IOException {
+                                              boolean ensurePdxCompatibility) throws IOException {
     return writeUserObject(o, out, ensurePdxCompatibility);
   }
 
   /**
    * Data serializes an instance of a "user class" (that is, a class that can be handled by a
    * registered {@code DataSerializer}) to the given {@code DataOutput}.
-   *
    * @return {@code true} if {@code o} was written to {@code out}.
    */
   private static boolean writeUserObject(Object o, DataOutput out, boolean ensurePdxCompatibility)
@@ -1670,7 +1610,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
       return true;
     } else if (is662SerializationEnabled()
         && (o.getClass().isEnum()/* for bug 52271 */ || (o.getClass().getSuperclass() != null
-            && o.getClass().getSuperclass().isEnum()))) {
+        && o.getClass().getSuperclass().isEnum()))) {
       if (isPdxSerializationInProgress()) {
         writePdxEnum((Enum<?>) o, out);
       } else {
@@ -1684,7 +1624,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
     }
   }
 
-
   public static boolean autoSerialized(Object o, DataOutput out) throws IOException {
     AutoSerializableManager asm = TypeRegistry.getAutoSerializableManager();
     if (asm != null) {
@@ -1767,7 +1706,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   /**
    * Reads an object that was serialized by a customer ("user") {@code DataSerializer} from the
    * given {@code DataInput}.
-   *
    * @throws IOException If the serializer that can deserialize the object is not registered.
    */
   private static Object readUserObject(DataInput in, int serializerId)
@@ -1776,7 +1714,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
     if (serializer == null) {
       throw new IOException(LocalizedStrings.DataSerializer_SERIALIZER_0_IS_NOT_REGISTERED
-          .toLocalizedString(new Object[] {serializerId}));
+          .toLocalizedString(new Object[]{serializerId}));
     }
 
     return serializer.fromData(in);
@@ -1784,25 +1722,21 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * Checks to make sure a {@code DataOutput} is not {@code null}.
-   *
    * @throws NullPointerException If {@code out} is {@code null}
    */
   public static void checkOut(DataOutput out) {
     if (out == null) {
-      String s = "Null DataOutput";
-      throw new NullPointerException(s);
+      throw new NullPointerException("Null DataOutput");
     }
   }
 
   /**
    * Checks to make sure a {@code DataInput} is not {@code null}.
-   *
    * @throws NullPointerException If {@code in} is {@code null}
    */
   public static void checkIn(DataInput in) {
     if (in == null) {
-      String s = "Null DataInput";
-      throw new NullPointerException(s);
+      throw new NullPointerException("Null DataInput");
     }
   }
 
@@ -1811,7 +1745,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
    * <P>
    * This method is internal because its semantics (that is, its ability to write any kind of {@code
    * Set}) are different from the {@code write}XXX methods of the external {@code DataSerializer}.
-   *
    * @throws IOException A problem occurs while writing to {@code out}
    * @see #readSet
    * @since GemFire 4.0
@@ -1819,29 +1752,24 @@ public abstract class InternalDataSerializer extends DataSerializer {
   public static void writeSet(Collection<?> set, DataOutput out) throws IOException {
     checkOut(out);
 
-    int size;
-    if (set == null) {
-      size = -1;
-    } else {
+    int size = -1;
+    if (set != null) {
       size = set.size();
     }
     writeArrayLength(size, out);
     if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
       logger.trace(LogMarker.SERIALIZER_VERBOSE, "Writing HashSet with {} elements: {}", size, set);
     }
-    if (size > 0) {
-      for (Object element : set) {
-        writeObject(element, out);
-      }
+    for (Object element : set) {
+      writeObject(element, out);
     }
   }
 
   /**
    * Reads a {@code Set} from a {@code DataInput}.
-   *
    * @throws IOException A problem occurs while writing to {@code out}
    * @throws ClassNotFoundException The class of one of the {@code HashSet}'s elements cannot be
-   *         found.
+   * found.
    * @see #writeSet
    * @since GemFire 4.0
    */
@@ -1852,7 +1780,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   /**
    * Reads a {@code Set} from a {@code DataInput} into the given non-null collection. Returns true
    * if collection read is non-null else returns false. TODO: readCollection is unused
-   *
    * @throws IOException A problem occurs while reading from {@code in}
    * @throws ClassNotFoundException The class of one of the {@code Set}'s elements cannot be found.
    * @see #writeSet
@@ -1879,7 +1806,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * write a set of Long objects
-   *
    * @param set the set of Long objects
    * @param hasLongIDs if false, write only ints, not longs
    * @param out the output stream
@@ -1922,7 +1848,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * write a set of Long objects TODO: writeListOfLongs is unused
-   *
    * @param list the set of Long objects
    * @param hasLongIDs if false, write only ints, not longs
    * @param out the output stream
@@ -1963,7 +1888,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
     }
   }
 
-
   /**
    * Writes the type code for a primitive type Class to {@code DataOutput}.
    */
@@ -1996,48 +1920,38 @@ public abstract class InternalDataSerializer extends DataSerializer {
   }
 
   public static Class decodePrimitiveClass(byte typeCode) {
-    if (typeCode == DSCODE.BOOLEAN_TYPE.toByte()) {
-      return Boolean.TYPE;
-    }
-    if (typeCode == DSCODE.CHARACTER_TYPE.toByte()) {
-      return Character.TYPE;
-    }
-    if (typeCode == DSCODE.BYTE_TYPE.toByte()) {
-      return Byte.TYPE;
-    }
-    if (typeCode == DSCODE.SHORT_TYPE.toByte()) {
-      return Short.TYPE;
-    }
-    if (typeCode == DSCODE.INTEGER_TYPE.toByte()) {
-      return Integer.TYPE;
-    }
-    if (typeCode == DSCODE.LONG_TYPE.toByte()) {
-      return Long.TYPE;
-    }
-    if (typeCode == DSCODE.FLOAT_TYPE.toByte()) {
-      return Float.TYPE;
-    }
-    if (typeCode == DSCODE.DOUBLE_TYPE.toByte()) {
-      return Double.TYPE;
-    }
-    if (typeCode == DSCODE.VOID_TYPE.toByte()) {
-      return Void.TYPE;
-    }
-    if (typeCode == DSCODE.NULL.toByte()) {
-      return null;
+    DSCODE dscode = DscodeHelper.toDSCODE(typeCode);
+    switch (dscode) {
+      case BOOLEAN_TYPE:
+        return Boolean.TYPE;
+      case CHARACTER_TYPE:
+        return Character.TYPE;
+      case BYTE_TYPE:
+        return Byte.TYPE;
+      case SHORT_TYPE:
+        return Short.TYPE;
+      case INTEGER_TYPE:
+        return Integer.TYPE;
+      case LONG_TYPE:
+        return Long.TYPE;
+      case FLOAT_TYPE:
+        return Float.TYPE;
+      case DOUBLE_TYPE:
+        return Double.TYPE;
+      case VOID_TYPE:
+        return Void.TYPE;
+      case NULL:
+        return null;
+      default:
+        throw new InternalGemFireError(
+            LocalizedStrings.InternalDataSerializer_UNEXPECTED_TYPECODE_0
+                .toLocalizedString(typeCode));
     }
-    throw new InternalGemFireError(
-        LocalizedStrings.InternalDataSerializer_UNEXPECTED_TYPECODE_0.toLocalizedString(typeCode));
-  }
 
-  private static final byte TIME_UNIT_NANOSECONDS = -1;
-  private static final byte TIME_UNIT_MICROSECONDS = -2;
-  private static final byte TIME_UNIT_MILLISECONDS = -3;
-  private static final byte TIME_UNIT_SECONDS = -4;
+  }
 
   /**
    * Reads a {@code TimeUnit} from a {@code DataInput}.
-   *
    * @throws IOException A problem occurs while writing to {@code out}
    */
   private static TimeUnit readTimeUnit(DataInput in) throws IOException {
@@ -2146,10 +2060,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
     return result;
   }
 
-  private static final ConcurrentMap dsfidToClassMap =
-      logger.isTraceEnabled(LogMarker.SERIALIZER_WRITE_DSFID_VERBOSE) ? new ConcurrentHashMap()
-          : null;
-
   public static void writeUserDataSerializableHeader(int classId, DataOutput out)
       throws IOException {
     if (classId <= Byte.MAX_VALUE && classId >= Byte.MIN_VALUE) {
@@ -2166,7 +2076,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * Writes given number of characters from array of {@code char}s to a {@code DataOutput}.
-   *
    * @throws IOException A problem occurs while writing to {@code out}
    * @see DataSerializer#readCharArray
    * @since GemFire 6.6
@@ -2190,7 +2099,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * returns true if the byte array is the serialized form of a null reference
-   *
    * @param serializedForm the serialized byte array
    */
   public static boolean isSerializedNull(byte[] serializedForm) {
@@ -2357,7 +2265,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   /**
    * write an object in java Serializable form with a SERIALIZABLE DSCODE so that it can be
    * deserialized with DataSerializer.readObject()
-   *
    * @param o the object to serialize
    * @param out the data output to serialize to
    */
@@ -2412,7 +2319,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
    * DataSerializable. It will invoke the correct toData method based on the class's version
    * information. This method does not write information about the class of the object. When
    * deserializing use the method invokeFromData to read the contents of the object.
-   *
    * @param ds the object to write
    * @param out the output stream.
    */
@@ -2422,7 +2328,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
       boolean invoked = false;
       Version v = InternalDataSerializer.getVersionForDataStreamOrNull(out);
 
-      if (v != null && v != Version.CURRENT) {
+      if (Version.CURRENT != v && v != null) {
         // get versions where DataOutput was upgraded
         Version[] versions = null;
         if (ds instanceof SerializationVersions) {
@@ -2431,12 +2337,12 @@ public abstract class InternalDataSerializer extends DataSerializer {
         }
         // check if the version of the peer or diskstore is different and
         // there has been a change in the message
-        if (versions != null && versions.length > 0) {
+        if (versions != null) {
           for (Version version : versions) {
             // if peer version is less than the greatest upgraded version
             if (v.compareTo(version) < 0) {
               ds.getClass().getMethod("toDataPre_" + version.getMethodSuffix(),
-                  new Class[] {DataOutput.class}).invoke(ds, out);
+                  new Class[]{DataOutput.class}).invoke(ds, out);
               invoked = true;
               break;
             }
@@ -2486,7 +2392,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
    * DataSerializable. It will invoke the correct fromData method based on the class's version
    * information. This method does not read information about the class of the object. When
    * serializing use the method invokeToData to write the contents of the object.
-   *
    * @param ds the object to write
    * @param in the input stream.
    */
@@ -2495,7 +2400,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
     try {
       boolean invoked = false;
       Version v = InternalDataSerializer.getVersionForDataStreamOrNull(in);
-      if (v != null && v != Version.CURRENT) {
+      if (Version.CURRENT != v && v != null) {
         // get versions where DataOutput was upgraded
         Version[] versions = null;
         if (ds instanceof SerializationVersions) {
@@ -2504,12 +2409,12 @@ public abstract class InternalDataSerializer extends DataSerializer {
         }
         // check if the version of the peer or diskstore is different and
         // there has been a change in the message
-        if (versions != null && versions.length > 0) {
+        if (versions != null) {
           for (Version version : versions) {
             // if peer version is less than the greatest upgraded version
             if (v.compareTo(version) < 0) {
               ds.getClass().getMethod("fromDataPre" + '_' + version.getMethodSuffix(),
-                  new Class[] {DataInput.class}).invoke(ds, in);
+                  new Class[]{DataInput.class}).invoke(ds, in);
               invoked = true;
               break;
             }
@@ -2534,7 +2439,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
     }
   }
 
-
   private static Object readDataSerializable(final DataInput in)
       throws IOException, ClassNotFoundException {
     Class c = readClass(in);
@@ -2642,25 +2546,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
     }
   }
 
-  // array is null
-  public static final byte NULL_ARRAY = -1;
-
-  /**
-   * array len encoded as unsigned short in next 2 bytes
-   *
-   * @since GemFire 5.7
-   */
-  private static final byte SHORT_ARRAY_LEN = -2;
-
-  /**
-   * array len encoded as int in next 4 bytes
-   *
-   * @since GemFire 5.7
-   */
-  public static final byte INT_ARRAY_LEN = -3;
-
-  private static final int MAX_BYTE_ARRAY_LEN = (byte) -4 & 0xFF;
-
   public static void writeArrayLength(int len, DataOutput out) throws IOException {
     if (len == -1) {
       out.writeByte(NULL_ARRAY);
@@ -2683,9 +2568,9 @@ public abstract class InternalDataSerializer extends DataSerializer {
       int result = ubyteToInt(code);
       if (result > MAX_BYTE_ARRAY_LEN) {
         if (code == SHORT_ARRAY_LEN) {
-          result = in.readUnsignedShort();
+          return in.readUnsignedShort();
         } else if (code == INT_ARRAY_LEN) {
-          result = in.readInt();
+          return in.readInt();
         } else {
           throw new IllegalStateException("unexpected array length code=" + code);
         }
@@ -2694,114 +2579,107 @@ public abstract class InternalDataSerializer extends DataSerializer {
     }
   }
 
-  /**
-   * Serializes a list of Integers. The argument may be null. Deserialize with
-   * readListOfIntegers().
-   *
-   * TODO: writeListOfIntegers is unused
-   */
-  public void writeListOfIntegers(List<Integer> list, DataOutput out) throws IOException {
-    int size;
-    if (list == null) {
-      size = -1;
-    } else {
-      size = list.size();
-    }
-    InternalDataSerializer.writeArrayLength(size, out);
-    if (size > 0) {
-      for (int i = 0; i < size; i++) {
-        out.writeInt(list.get(i));
-      }
+  private static Object readDSFID(final DataInput in, DSCODE dscode)
+      throws IOException, ClassNotFoundException {
+    if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
+      logger.trace(LogMarker.SERIALIZER_VERBOSE, "readDSFID: header={}", dscode);
+    }
+    switch (dscode) {
+      case DS_FIXED_ID_BYTE:
+        return DSFIDFactory.create(in.readByte(), in);
+      case DS_FIXED_ID_SHORT:
+        return DSFIDFactory.create(in.readShort(), in);
+      case DS_NO_FIXED_ID:
+        return readDataSerializableFixedID(in);
+      case DS_FIXED_ID_INT:
+        return DSFIDFactory.create(in.readInt(), in);
+      default:
+        throw new IllegalStateException("unexpected byte: " + dscode + " while reading dsfid");
     }
   }
 
   public static Object readDSFID(final DataInput in) throws IOException, ClassNotFoundException {
     checkIn(in);
-    byte header = in.readByte();
-    if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
-      logger.trace(LogMarker.SERIALIZER_VERBOSE, "readDSFID: header={}", header);
-    }
-    if (header == DSCODE.DS_FIXED_ID_BYTE.toByte()) {
-      return DSFIDFactory.create(in.readByte(), in);
-    } else if (header == DSCODE.DS_FIXED_ID_SHORT.toByte()) {
-      return DSFIDFactory.create(in.readShort(), in);
-    } else if (header == DSCODE.DS_NO_FIXED_ID.toByte()) {
-      return readDataSerializableFixedID(in);
-    } else if (header == DSCODE.DS_FIXED_ID_INT.toByte()) {
-      return DSFIDFactory.create(in.readInt(), in);
-    } else {
-      throw new IllegalStateException("unexpected byte: " + header + " while reading dsfid");
+    return readDSFID(in, DscodeHelper.toDSCODE(in.readByte()));
+  }
+
+  private static int readDSFIDHeader(final DataInput in, DSCODE dscode) throws IOException {
+    switch (dscode) {
+      case DS_FIXED_ID_BYTE:
+        return in.readByte();
+      case DS_FIXED_ID_SHORT:
+        return in.readShort();
+      case DS_FIXED_ID_INT:
+        return in.readInt();
+      default:
+        throw new IllegalStateException("unexpected byte: " + dscode + " while reading dsfid");
     }
   }
 
   public static int readDSFIDHeader(final DataInput in) throws IOException {
     checkIn(in);
-    byte header = in.readByte();
-    if (header == DSCODE.DS_FIXED_ID_BYTE.toByte()) {
-      return in.readByte();
-    } else if (header == DSCODE.DS_FIXED_ID_SHORT.toByte()) {
-      return in.readShort();
-    } else if (header == DSCODE.DS_NO_FIXED_ID.toByte()) {
-      // is that correct??
-      return Integer.MAX_VALUE;
-    } else if (header == DSCODE.DS_FIXED_ID_INT.toByte()) {
-      return in.readInt();
-    } else {
-      throw new IllegalStateException("unexpected byte: " + header + " while reading dsfid");
-    }
+    return readDSFIDHeader(in, DscodeHelper.toDSCODE(in.readByte()));
   }
 
   /**
    * Reads an instance of {@code String} from a {@code DataInput} given the header byte already
    * being read. The return value may be {@code null}.
-   *
    * @throws IOException A problem occurs while reading from {@code in}
    * @since GemFire 5.7
    */
-  public static String readString(DataInput in, byte header) throws IOException {
-    if (header == DSCODE.STRING_BYTES.toByte()) {
-      int len = in.readUnsignedShort();
-      if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
-        logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading STRING_BYTES of len={}", len);
-      }
-      byte[] buf = new byte[len];
-      in.readFully(buf, 0, len);
-      return new String(buf, 0); // intentionally using deprecated constructor
-    } else if (header == DSCODE.STRING.toByte()) {
-      if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
-        logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading utf STRING");
-      }
-      return in.readUTF();
-    } else if (header == DSCODE.NULL_STRING.toByte()) {
-      if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
-        logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading NULL_STRING");
-      }
-      return null;
-    } else if (header == DSCODE.HUGE_STRING_BYTES.toByte()) {
-      int len = in.readInt();
-      if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
-        logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading HUGE_STRING_BYTES of len={}", len);
-      }
-      byte[] buf = new byte[len];
-      in.readFully(buf, 0, len);
-      return new String(buf, 0); // intentionally using deprecated constructor
-    } else if (header == DSCODE.HUGE_STRING.toByte()) {
-      int len = in.readInt();
-      if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
-        logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading HUGE_STRING of len={}", len);
-      }
-      char[] buf = new char[len];
-      for (int i = 0; i < len; i++) {
-        buf[i] = in.readChar();
+  private static String readString(DataInput in, DSCODE dscode) throws IOException {
+    switch (dscode) {
+      case STRING_BYTES:
+        return readStringBytesFromDataInput(in, in.readUnsignedShort());
+      case STRING:
+        return readStringUTFFromDataInput(in);
+      case NULL_STRING: {
+        if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
+          logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading NULL_STRING");
+        }
+        return null;
       }
-      return new String(buf);
-    } else {
-      String s = "Unknown String header " + header;
-      throw new IOException(s);
+      case HUGE_STRING_BYTES:
+        return readStringBytesFromDataInput(in, in.readInt());
+      case HUGE_STRING:
+        return readHugeStringFromDataInput(in);
+      default:
+        throw new IOException("Unknown String header " + dscode);
     }
   }
 
-  private static DataSerializer dvddeserializer;
+  private static String readHugeStringFromDataInput(DataInput in) throws IOException {
+    int len = in.readInt();
+    if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
+      logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading HUGE_STRING of len={}", len);
+    }
+    char[] buf = new char[len];
+    for (int i = 0; i < len; i++) {
+      buf[i] = in.readChar();
+    }
+    return new String(buf);
+  }
+
+  private static String readStringUTFFromDataInput(DataInput in) throws IOException {
+    if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
+      logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading utf STRING");
+    }
+    return in.readUTF();
+  }
+
+  private static String readStringBytesFromDataInput(DataInput dataInput, int len)
+      throws IOException {
+    if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
+      logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading STRING_BYTES of len={}", len);
+    }
+    byte[] buf = new byte[len];
+    dataInput.readFully(buf, 0, len);
+    return new String(buf, 0); // intentionally using deprecated constructor
+  }
+
+  public static String readString(DataInput in, byte header) throws IOException {
+    return readString(in, DscodeHelper.toDSCODE(header));
+  }
 
   // TODO: registerDVDDeserializer is unused
   public static void registerDVDDeserializer(DataSerializer dvddeslzr) {
@@ -2810,7 +2688,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * Just like readObject but make sure and pdx deserialized is not a PdxInstance.
-   *
    * @since GemFire 6.6.2
    */
   public static <T> T readNonPdxInstanceObject(final DataInput in)
@@ -2834,260 +2711,207 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
     // Read the header byte
     byte header = in.readByte();
+    DSCODE headerDSCode = DscodeHelper.toDSCODE(header);
+
     if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
       logger.trace(LogMarker.SERIALIZER_VERBOSE, "basicReadObject: header={}", header);
     }
-    if (header == DSCODE.DS_FIXED_ID_BYTE.toByte()) {
-      return DSFIDFactory.create(in.readByte(), in);
-    }
-    if (header == DSCODE.DS_FIXED_ID_SHORT.toByte()) {
-      return DSFIDFactory.create(in.readShort(), in);
-    }
-    if (header == DSCODE.DS_FIXED_ID_INT.toByte()) {
-      return DSFIDFactory.create(in.readInt(), in);
-    }
-    if (header == DSCODE.DS_NO_FIXED_ID.toByte()) {
-      return readDataSerializableFixedID(in);
-    }
-    if (header == DSCODE.NULL.toByte()) {
-      return null;
-    }
-    if (header == DSCODE.NULL_STRING.toByte() || header == DSCODE.STRING.toByte()
-        || header == DSCODE.HUGE_STRING.toByte() || header == DSCODE.STRING_BYTES.toByte()
-        || header == DSCODE.HUGE_STRING_BYTES.toByte()) {
-      return readString(in, header);
-    }
-    if (header == DSCODE.CLASS.toByte()) {
-      return readClass(in);
-    }
-    if (header == DSCODE.DATE.toByte()) {
-      return readDate(in);
-    }
-    if (header == DSCODE.FILE.toByte()) {
-      return readFile(in);
-    }
-    if (header == DSCODE.INET_ADDRESS.toByte()) {
-      return readInetAddress(in);
-    }
-    if (header == DSCODE.BOOLEAN.toByte()) {
-      return readBoolean(in);
-    }
-    if (header == DSCODE.CHARACTER.toByte()) {
-      return readCharacter(in);
-    }
-    if (header == DSCODE.BYTE.toByte()) {
-      return readByte(in);
-    }
-    if (header == DSCODE.SHORT.toByte()) {
-      return readShort(in);
-    }
-    if (header == DSCODE.INTEGER.toByte()) {
-      return readInteger(in);
-    }
-    if (header == DSCODE.LONG.toByte()) {
-      return readLong(in);
-    }
-    if (header == DSCODE.FLOAT.toByte()) {
-      return readFloat(in);
-    }
-    if (header == DSCODE.DOUBLE.toByte()) {
-      return readDouble(in);
-    }
-    if (header == DSCODE.BYTE_ARRAY.toByte()) {
-      return readByteArray(in);
-    }
-    if (header == DSCODE.ARRAY_OF_BYTE_ARRAYS.toByte()) {
-      return readArrayOfByteArrays(in);
-    }
-    if (header == DSCODE.SHORT_ARRAY.toByte()) {
-      return readShortArray(in);
-    }
-    if (header == DSCODE.STRING_ARRAY.toByte()) {
-      return readStringArray(in);
-    }
-    if (header == DSCODE.INT_ARRAY.toByte()) {
-      return readIntArray(in);
-    }
-    if (header == DSCODE.LONG_ARRAY.toByte()) {
-      return readLongArray(in);
-    }
-    if (header == DSCODE.FLOAT_ARRAY.toByte()) {
-      return readFloatArray(in);
-    }
-    if (header == DSCODE.DOUBLE_ARRAY.toByte()) {
-      return readDoubleArray(in);
-    }
-    if (header == DSCODE.BOOLEAN_ARRAY.toByte()) {
-      return readBooleanArray(in);
-    }
-    if (header == DSCODE.CHAR_ARRAY.toByte()) {
-      return readCharArray(in);
-    }
-    if (header == DSCODE.OBJECT_ARRAY.toByte()) {
-      return readObjectArray(in);
-    }
-    if (header == DSCODE.ARRAY_LIST.toByte()) {
-      return readArrayList(in);
-    }
-    if (header == DSCODE.LINKED_LIST.toByte()) {
-      return readLinkedList(in);
-    }
-    if (header == DSCODE.HASH_SET.toByte()) {
-      return readHashSet(in);
-    }
-    if (header == DSCODE.LINKED_HASH_SET.toByte()) {
-      return readLinkedHashSet(in);
-    }
-    if (header == DSCODE.HASH_MAP.toByte()) {
-      return readHashMap(in);
-    }
-    if (header == DSCODE.IDENTITY_HASH_MAP.toByte()) {
-      return readIdentityHashMap(in);
-    }
-    if (header == DSCODE.HASH_TABLE.toByte()) {
-      return readHashtable(in);
-    }
-    if (header == DSCODE.CONCURRENT_HASH_MAP.toByte()) {
-      return readConcurrentHashMap(in);
-    }
-    if (header == DSCODE.PROPERTIES.toByte()) {
-      return readProperties(in);
-    }
-    if (header == DSCODE.TIME_UNIT.toByte()) {
-      return readTimeUnit(in);
-    }
-    if (header == DSCODE.USER_CLASS.toByte()) {
-      return readUserObject(in, in.readByte());
-    }
-    if (header == DSCODE.USER_CLASS_2.toByte()) {
-      return readUserObject(in, in.readShort());
-    }
-    if (header == DSCODE.USER_CLASS_4.toByte()) {
-      return readUserObject(in, in.readInt());
-    }
-    if (header == DSCODE.VECTOR.toByte()) {
-      return readVector(in);
-    }
-    if (header == DSCODE.STACK.toByte()) {
-      return readStack(in);
-    }
-    if (header == DSCODE.TREE_MAP.toByte()) {
-      return readTreeMap(in);
-    }
-    if (header == DSCODE.TREE_SET.toByte()) {
-      return readTreeSet(in);
-    }
-    if (header == DSCODE.BOOLEAN_TYPE.toByte()) {
-      return Boolean.TYPE;
-    }
-    if (header == DSCODE.CHARACTER_TYPE.toByte()) {
-      return Character.TYPE;
-    }
-    if (header == DSCODE.BYTE_TYPE.toByte()) {
-      return Byte.TYPE;
-    }
-    if (header == DSCODE.SHORT_TYPE.toByte()) {
-      return Short.TYPE;
-    }
-    if (header == DSCODE.INTEGER_TYPE.toByte()) {
-      return Integer.TYPE;
-    }
-    if (header == DSCODE.LONG_TYPE.toByte()) {
-      return Long.TYPE;
-    }
-    if (header == DSCODE.FLOAT_TYPE.toByte()) {
-      return Float.TYPE;
-    }
-    if (header == DSCODE.DOUBLE_TYPE.toByte()) {
-      return Double.TYPE;
-    }
-    if (header == DSCODE.VOID_TYPE.toByte()) {
-      return Void.TYPE;
-    }
-    if (header == DSCODE.USER_DATA_SERIALIZABLE.toByte()) {
-      return readUserDataSerializable(in, in.readByte());
-    }
-    if (header == DSCODE.USER_DATA_SERIALIZABLE_2.toByte()) {
-      return readUserDataSerializable(in, in.readShort());
-    }
-    if (header == DSCODE.USER_DATA_SERIALIZABLE_4.toByte()) {
-      return readUserDataSerializable(in, in.readInt());
-    }
-    if (header == DSCODE.DATA_SERIALIZABLE.toByte()) {
-      return readDataSerializable(in);
+
+    switch (headerDSCode) {
+      case DS_FIXED_ID_BYTE:
+        return DSFIDFactory.create(in.readByte(), in);
+      case DS_FIXED_ID_SHORT:
+        return DSFIDFactory.create(in.readShort(), in);
+      case DS_FIXED_ID_INT:
+        return DSFIDFactory.create(in.readInt(), in);
+      case DS_NO_FIXED_ID:
+        return readDataSerializableFixedID(in);
+      case NULL:
+        return null;
+      case NULL_STRING:
+        return null;
+      case STRING:
+        return readStringUTFFromDataInput(in);
+      case HUGE_STRING:
+        return readHugeStringFromDataInput(in);
+      case STRING_BYTES:
+        return readStringBytesFromDataInput(in, in.readUnsignedShort());
+      case HUGE_STRING_BYTES:
+        return readStringBytesFromDataInput(in, in.readInt());
+      case CLASS:
+        return readClass(in);
+      case DATE:
+        return readDate(in);
+      case FILE:
+        return readFile(in);
+      case INET_ADDRESS:
+        return readInetAddress(in);
+      case BOOLEAN:
+        return readBoolean(in);
+      case CHARACTER:
+        return readCharacter(in);
+      case BYTE:
+        return readByte(in);
+      case SHORT:
+        return readShort(in);
+      case INTEGER:
+        return readInteger(in);
+      case LONG:
+        return readLong(in);
+      case FLOAT:
+        return readFloat(in);
+      case DOUBLE:
+        return readDouble(in);
+      case BYTE_ARRAY:
+        return readByteArray(in);
+      case ARRAY_OF_BYTE_ARRAYS:
+        return readArrayOfByteArrays(in);
+      case SHORT_ARRAY:
+        return readShortArray(in);
+      case STRING_ARRAY:
+        return readStringArray(in);
+      case INT_ARRAY:
+        return readIntArray(in);
+      case LONG_ARRAY:
+        return readLongArray(in);
+      case FLOAT_ARRAY:
+        return readFloatArray(in);
+      case DOUBLE_ARRAY:
+        return readDoubleArray(in);
+      case BOOLEAN_ARRAY:
+        return readBooleanArray(in);
+      case CHAR_ARRAY:
+        return readCharArray(in);
+      case OBJECT_ARRAY:
+        return readObjectArray(in);
+      case ARRAY_LIST:
+        return readArrayList(in);
+      case LINKED_LIST:
+        return readLinkedList(in);
+      case HASH_SET:
+        return readHashSet(in);
+      case LINKED_HASH_SET:
+        return readLinkedHashSet(in);
+      case HASH_MAP:
+        return readHashMap(in);
+      case IDENTITY_HASH_MAP:
+        return readIdentityHashMap(in);
+      case HASH_TABLE:
+        return readHashtable(in);
+      case CONCURRENT_HASH_MAP:
+        return readConcurrentHashMap(in);
+      case PROPERTIES:
+        return readProperties(in);
+      case TIME_UNIT:
+        return readTimeUnit(in);
+      case USER_CLASS:
+        return readUserObject(in, in.readByte());
+      case USER_CLASS_2:
+        return readUserObject(in, in.readShort());
+      case USER_CLASS_4:
+        return readUserObject(in, in.readInt());
+      case VECTOR:
+        return readVector(in);
+      case STACK:
+        return readStack(in);
+      case TREE_MAP:
+        return readTreeMap(in);
+      case TREE_SET:
+        return readTreeSet(in);
+      case BOOLEAN_TYPE:
+        return Boolean.TYPE;
+      case CHARACTER_TYPE:
+        return Character.TYPE;
+      case BYTE_TYPE:
+        return Byte.TYPE;
+      case SHORT_TYPE:
+        return Short.TYPE;
+      case INTEGER_TYPE:
+        return Integer.TYPE;
+      case LONG_TYPE:
+        return Long.TYPE;
+      case FLOAT_TYPE:
+        return Float.TYPE;
+      case DOUBLE_TYPE:
+        return Double.TYPE;
+      case VOID_TYPE:
+        return Void.TYPE;
+      case USER_DATA_SERIALIZABLE:
+        return readUserDataSerializable(in, in.readByte());
+      case USER_DATA_SERIALIZABLE_2:
+        return readUserDataSerializable(in, in.readShort());
+      case USER_DATA_SERIALIZABLE_4:
+        return readUserDataSerializable(in, in.readInt());
+      case DATA_SERIALIZABLE:
+        return readDataSerializable(in);
+      case SERIALIZABLE:
+        return readSerializable(in);
+      case PDX:
+        return readPdxSerializable(in);
+      case PDX_ENUM:
+        return readPdxEnum(in);
+      case GEMFIRE_ENUM:
+        return readGemFireEnum(in);
+      case PDX_INLINE_ENUM:
+        return readPdxInlineEnum(in);
+      case BIG_INTEGER:
+        return readBigInteger(in);
+      case BIG_DECIMAL:
+        return readBigDecimal(in);
+      case UUID:
+        return readUUID(in);
+      case TIMESTAMP:
+        return readTimestamp(in);
+      default:
+        throw new IOException("Unknown header byte: " + header);
     }
-    if (header == DSCODE.SERIALIZABLE.toByte()) {
-      final boolean isDebugEnabled_SERIALIZER = logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE);
-      Object serializableResult;
-      if (in instanceof DSObjectInputStream) {
-        serializableResult = ((DSObjectInputStream) in).readObject();
+  }
+
+  private static Serializable readSerializable(DataInput in)
+      throws IOException, ClassNotFoundException {
+    final boolean isDebugEnabled_SERIALIZER = logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE);
+    Serializable serializableResult;
+    if (in instanceof DSObjectInputStream) {
+      serializableResult = (Serializable) ((DSObjectInputStream) in).readObject();
+    } else {
+      InputStream stream;
+      if (in instanceof InputStream) {
+        stream = (InputStream) in;
       } else {
-        InputStream stream;
-        if (in instanceof InputStream) {
-          stream = (InputStream) in;
-        } else {
-          stream = new InputStream() {
-            @Override
-            public int read() throws IOException {
-              try {
-                return in.readUnsignedByte(); // fix for bug 47249
-              } catch (EOFException ignored) {
-                return -1;
-              }
+        stream = new InputStream() {
+          @Override
+          public int read() throws IOException {
+            try {
+              return in.readUnsignedByte(); // fix for bug 47249
+            } catch (EOFException ignored) {
+              return -1;
             }
-
-          };
-        }
-
-        ObjectInput ois = new DSObjectInputStream(stream);
-        serializationFilter.setFilterOn((ObjectInputStream) ois);
-        if (stream instanceof VersionedDataStream) {
-          Version v = ((VersionedDataStream) stream).getVersion();
-          if (v != null && v != Version.CURRENT) {
-            ois = new VersionedObjectInput(ois, v);
           }
-        }
 
-        serializableResult = ois.readObject();
+        };
+      }
 
-        if (isDebugEnabled_SERIALIZER) {
-          logger.trace(LogMarker.SERIALIZER_VERBOSE, "Read Serializable object: {}",
-              serializableResult);
+      ObjectInput ois = new DSObjectInputStream(stream);
+      serializationFilter.setFilterOn((ObjectInputStream) ois);
+      if (stream instanceof VersionedDataStream) {
+        Version v = ((VersionedDataStream) stream).getVersion();
+        if (Version.CURRENT != v && v != null) {
+          ois = new VersionedObjectInput(ois, v);
         }
       }
+
+      serializableResult = (Serializable) ois.readObject();
+
       if (isDebugEnabled_SERIALIZER) {
-        logger.trace(LogMarker.SERIALIZER_VERBOSE, "deserialized instanceof {}",
-            serializableResult.getClass());
+        logger.trace(LogMarker.SERIALIZER_VERBOSE, "Read Serializable object: {}",
+            serializableResult);
       }
-      return serializableResult;
     }
-    if (header == DSCODE.PDX.toByte()) {
-      return readPdxSerializable(in);
-    }
-    if (header == DSCODE.PDX_ENUM.toByte()) {
-      return readPdxEnum(in);
-    }
-    if (header == DSCODE.GEMFIRE_ENUM.toByte()) {
-      return readGemFireEnum(in);
-    }
-    if (header == DSCODE.PDX_INLINE_ENUM.toByte()) {
-      return readPdxInlineEnum(in);
-    }
-    if (header == DSCODE.BIG_INTEGER.toByte()) {
-      return readBigInteger(in);
-    }
-    if (header == DSCODE.BIG_DECIMAL.toByte()) {
-      return readBigDecimal(in);
-    }
-    if (header == DSCODE.UUID.toByte()) {
-      return readUUID(in);
-    }
-    if (header == DSCODE.TIMESTAMP.toByte()) {
-      return readTimestamp(in);
+    if (isDebugEnabled_SERIALIZER) {
+      logger.trace(LogMarker.SERIALIZER_VERBOSE, "deserialized instanceof {}",
+          serializableResult.getClass());
     }
-
-    String s = "Unknown header byte: " + header;
-    throw new IOException(s);
+    return serializableResult;
   }
 
   private static Object readUserDataSerializable(final DataInput in, int classId)
@@ -3123,23 +2947,17 @@ public abstract class InternalDataSerializer extends DataSerializer {
     }
   }
 
-  private static final ThreadLocal<Boolean> pdxSerializationInProgress = new ThreadLocal<>();
-
   public static boolean isPdxSerializationInProgress() {
     Boolean v = pdxSerializationInProgress.get();
     return v != null && v;
   }
 
-  public static void setPdxSerializationInProgress(boolean v) {
-    if (v) {
-      pdxSerializationInProgress.set(true);
-    } else {
-      pdxSerializationInProgress.set(false);
-    }
+  public static void setPdxSerializationInProgress(boolean inProgress) {
+    pdxSerializationInProgress.set(inProgress);
   }
 
   public static boolean writePdx(DataOutput out, InternalCache internalCache, Object pdx,
-      PdxSerializer pdxSerializer) throws IOException {
+                                 PdxSerializer pdxSerializer) throws IOException {
     TypeRegistry tr = null;
     if (internalCache != null) {
       tr = internalCache.getPdxRegistry();
@@ -3327,63 +3145,371 @@ public abstract class InternalDataSerializer extends DataSerializer {
     return supportedClassesToHolders;
   }
 
-  /**
-   * A marker object for {@code DataSerializer}s that have not been registered. Using this marker
-   * object allows us to asynchronously send {@code DataSerializer} registration updates. If the
-   * serialized bytes arrive at a VM before the registration message does, the deserializer will
-   * wait an amount of time for the registration message to arrive.
-   */
-  abstract static class Marker {
-    /**
-     * The DataSerializer that is filled in upon registration
-     */
-    protected DataSerializer serializer = null;
-
-    /**
-     * set to true once setSerializer is called.
-     */
-    boolean hasBeenSet = false;
-
-    abstract DataSerializer getSerializer();
-
-    /**
-     * Sets the serializer associated with this marker. It will notify any threads that are waiting
-     * for the serializer to be registered.
-     */
-    void setSerializer(DataSerializer serializer) {
-      synchronized (this) {
-        this.hasBeenSet = true;
-        this.serializer = serializer;
-        this.notifyAll();
+  public static void writeObjectArray(Object[] array, DataOutput out, boolean ensureCompatibility)
+      throws IOException {
+    InternalDataSerializer.checkOut(out);
+    int length = -1;
+    if (array != null) {
+      length = array.length;
+    }
+    InternalDataSerializer.writeArrayLength(length, out);
+    if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
+      logger.trace(LogMarker.SERIALIZER_VERBOSE, "Writing Object array of length {}", length);
+    }
+    if (length >= 0) {
+      writeClass(array.getClass().getComponentType(), out);
+      for (int i = 0; i < length; i++) {
+        basicWriteObject(array[i], out, ensureCompatibility);
       }
     }
   }
 
   /**
-   * A marker object for {@code DataSerializer}s that have not been registered. Using this marker
-   * object allows us to asynchronously send {@code DataSerializer} registration updates. If the
-   * serialized bytes arrive at a VM before the registration message does, the deserializer will
-   * wait an amount of time for the registration message to arrive. Made public for unit test
-   * access.
-   *
-   * @since GemFire 5.7
+   * Write a variable length long the old way (pre 7.0). Use this only in contexts where you might
+   * need to communicate with pre 7.0 members or files.
    */
-  public static class GetMarker extends Marker {
-    /**
-     * Number of milliseconds to wait. Also used by InternalInstantiator. Note that some tests set
-     * this to a small amount to speed up failures. Made public for unit test access.
-     */
-    public static int WAIT_MS = Integer.getInteger(
-        DistributionConfig.GEMFIRE_PREFIX + "InternalDataSerializer.WAIT_MS", 60 * 1000);
+  public static void writeVLOld(long data, DataOutput out) throws IOException {
+    if (data < 0) {
+      Assert.fail("Data expected to be >=0 is " + data);
+    }
+    if (data <= MAX_BYTE_VL) {
+      out.writeByte((byte) data);
+    } else if (data <= 0x7FFF) {
+      // set the sign bit to indicate a short
+      out.write(((int) data >>> 8 | 0x80) & 0xFF);
+      out.write((int) data >>> 0 & 0xFF);
+    } else if (data <= Integer.MAX_VALUE) {
+      out.writeByte(INT_VL);
+      out.writeInt((int) data);
+    } else {
+      out.writeByte(LONG_VL);
+      out.writeLong(data);
+    }
+  }
 
-    /**
-     * Returns the serializer associated with this marker. If the serializer has not been registered
-     * yet, then this method will wait until the serializer is registered. If this method has to
-     * wait for too long, then {@code null} is returned.
-     */
-    @Override
-    DataSerializer getSerializer() {
-      synchronized (this) {
+  /**
+   * Write a variable length long the old way (pre 7.0). Use this only in contexts where you might
+   * need to communicate with pre 7.0 members or files.
+   */
+  public static long readVLOld(DataInput in) throws IOException {
+    byte code = in.readByte();
+    long result;
+    if (code < 0) {
+      // mask off sign bit
+      result = code & 0x7F;
+      result <<= 8;
+      result |= in.readByte() & 0xFF;
+    } else if (code <= MAX_BYTE_VL) {
+      result = code;
+    } else if (code == INT_VL) {
+      result = in.readInt();
+    } else if (code == LONG_VL) {
+      result = in.readLong();
+    } else {
+      throw new IllegalStateException("unexpected variable length code=" + code);
+    }
+    return result;
+  }
+
+  /**
+   * Encode a long as a variable length array.
+   *
+   * This method is appropriate for unsigned integers. For signed integers, negative values will
+   * always consume 10 bytes, so it is recommended to use writeSignedVL instead.
+   *
+   * This is taken from the varint encoding in protobufs (BSD licensed). See
+   * https://developers.google.com/protocol-buffers/docs/encoding
+   */
+  public static void writeUnsignedVL(long data, DataOutput out) throws IOException {
+    while (true) {
+      if ((data & ~0x7FL) == 0) {
+        out.writeByte((int) data);
+        return;
+      } else {
+        out.writeByte((int) data & 0x7F | 0x80);
+        data >>>= 7;
+      }
+    }
+  }
+
+  /**
+   * Decode a long as a variable length array.
+   *
+   * This is taken from the varint encoding in protobufs (BSD licensed). See
+   * https://developers.google.com/protocol-buffers/docs/encoding
+   */
+  public static long readUnsignedVL(DataInput in) throws IOException {
+    int shift = 0;
+    long result = 0;
+    while (shift < 64) {
+      final byte b = in.readByte();
+      result |= (long) (b & 0x7F) << shift;
+      if ((b & 0x80) == 0) {
+        return result;
+      }
+      shift += 7;
+    }
+    throw new GemFireIOException("Malformed variable length integer");
+  }
+
+  /**
+   * Encode a signed long as a variable length array.
+   *
+   * This method is appropriate for signed integers. It uses zig zag encoding to so that negative
+   * numbers will be represented more compactly. For unsigned values, writeUnsignedVL will be more
+   * efficient.
+   */
+  public static void writeSignedVL(long data, DataOutput out) throws IOException {
+    writeUnsignedVL(encodeZigZag64(data), out);
+  }
+
+  /**
+   * Decode a signed long as a variable length array.
+   *
+   * This method is appropriate for signed integers. It uses zig zag encoding to so that negative
+   * numbers will be represented more compactly. For unsigned values, writeUnsignedVL will be more
+   * efficient.
+   */
+  public static long readSignedVL(DataInput in) throws IOException {
+    return decodeZigZag64(readUnsignedVL(in));
+  }
+
+  /**
+   * Decode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers into values that can be
+   * efficiently encoded with varint. (Otherwise, negative values must be sign-extended to 64 bits
+   * to be varint encoded, thus always taking 10 bytes on the wire.)
+   * @param n An unsigned 64-bit integer, stored in a signed int because Java has no explicit
+   * unsigned support.
+   * @return A signed 64-bit integer.
+   *
+   * This is taken from the varint encoding in protobufs (BSD licensed). See
+   * https://developers.google.com/protocol-buffers/docs/encoding
+   */
+  private static long decodeZigZag64(final long n) {
+    return n >>> 1 ^ -(n & 1);
+  }
+
+  /**
+   * Encode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers into values that can be
+   * efficiently encoded with varint. (Otherwise, negative values must be sign-extended to 64 bits
+   * to be varint encoded, thus always taking 10 bytes on the wire.)
+   * @param n A signed 64-bit integer.
+   * @return An unsigned 64-bit integer, stored in a signed int because Java has no explicit
+   * unsigned support.
+   *
+   * This is taken from the varint encoding in protobufs (BSD licensed). See
+   * https://developers.google.com/protocol-buffers/docs/encoding
+   */
+  private static long encodeZigZag64(final long n) {
+    // Note: the right-shift must be arithmetic
+    return n << 1 ^ n >> 63;
+  }
+
+  /* test only method */
+  public static int calculateBytesForTSandDSID(int dsid) {
+    HeapDataOutputStream out = new HeapDataOutputStream(4 + 8, Version.CURRENT);
+    long now = System.currentTimeMillis();
+    try {
+      writeUnsignedVL(now, out);
+      writeUnsignedVL(InternalDataSerializer.encodeZigZag64(dsid), out);
+    } catch (IOException ignored) {
+      return 0;
+    }
+    return out.size();
+  }
+
+  public static Class<?> getCachedClass(String p_className) throws ClassNotFoundException {
+    String className = processIncomingClassName(p_className);
+    if (LOAD_CLASS_EACH_TIME) {
+      return ClassPathLoader.getLatest().forName(className);
+    } else {
+      Class<?> result = getExistingCachedClass(className);
+      if (result == null) {
+        // Do the forName call outside the sync to fix bug 46172
+        result = ClassPathLoader.getLatest().forName(className);
+        synchronized (cacheAccessLock) {
+          Class<?> cachedClass = getExistingCachedClass(className);
+          if (cachedClass == null) {
+            classCache.put(className, new WeakReference<>(result));
+          } else {
+            result = cachedClass;
+          }
+        }
+      }
+      return result;
+    }
+  }
+
+  private static Class<?> getExistingCachedClass(String className) {
+    WeakReference<Class<?>> wr = classCache.get(className);
+    Class<?> result = null;
+    if (wr != null) {
+      result = wr.get();
+    }
+    return result;
+  }
+
+  public static void flushClassCache() {
+    if (classCache != null) {
+      // Not locking classCache during clear as doing so causes a deadlock in the DeployedJar
+      classCache.clear();
+    }
+  }
+
+  /**
+   * Serializes a list of Integers. The argument may be null. Deserialize with
+   * readListOfIntegers().
+   *
+   * TODO: writeListOfIntegers is unused
+   */
+  public void writeListOfIntegers(List<Integer> list, DataOutput out) throws IOException {
+    int size = -1;
+    if (list != null) {
+      size = list.size();
+    }
+    InternalDataSerializer.writeArrayLength(size, out);
+    for (int i = 0; i < size; i++) {
+      out.writeInt(list.get(i));
+    }
+  }
+
+  /**
+   * Any time new serialization format is added then a new enum needs to be added here.
+   * @since GemFire 6.6.2
+   */
+  private enum SERIALIZATION_VERSION {
+    vINVALID,
+    // includes 6.6.0.x and 6.6.1.x. Note that no serialization changes were made in 6.6 until 6.6.2
+    v660,
+    // 6.6.2.x or later NOTE if you add a new constant make sure and update "latestVersion".
+    v662
+  }
+
+  /**
+   * A listener whose listener methods are invoked when {@link DataSerializer}s and {@link
+   * Instantiator}s are registered. This is part of the fix for bug 31422.
+   * @see InternalDataSerializer#addRegistrationListener
+   * @see InternalDataSerializer#removeRegistrationListener
+   */
+  public interface RegistrationListener {
+
+    /**
+     * Invoked when a new {@code Instantiator} is {@linkplain Instantiator#register(Instantiator)
+     * registered}.
+     */
+    void newInstantiator(Instantiator instantiator);
+
+    /**
+     * Invoked when a new {@code DataSerializer} is {@linkplain DataSerializer#register(Class)
+     * registered}.
+     */
+    void newDataSerializer(DataSerializer ds);
+  }
+
+  /**
+   * A SerializerAttributesHolder holds information required to load a DataSerializer and exists to
+   * allow client/server connections to be created more quickly than they would if the
+   * DataSerializer information downloaded from the server were used to immediately load the
+   * corresponding classes.
+   */
+  public static class SerializerAttributesHolder {
+    private String className = "";
+    private EventID eventId = null;
+    private ClientProxyMembershipID proxyId = null;
+    private int id = 0;
+
+    SerializerAttributesHolder() {
+    }
+
+    SerializerAttributesHolder(String name, EventID event, ClientProxyMembershipID proxy, int id) {
+      this.className = name;
+      this.eventId = event;
+      this.proxyId = proxy;
+      this.id = id;
+    }
+
+    /**
+     * @return String the classname of the data serializer this instance represents.
+     */
+    public String getClassName() {
+      return this.className;
+    }
+
+    public EventID getEventId() {
+      return this.eventId;
+    }
+
+    public ClientProxyMembershipID getProxyId() {
+      return this.proxyId;
+    }
+
+    public int getId() {
+      return this.id;
+    }
+
+    @Override
+    public String toString() {
+      return "SerializerAttributesHolder[name=" + this.className + ",id=" + this.id + ",eventId="
+          + this.eventId + ']';
+    }
+  }
+
+  /**
+   * A marker object for {@code DataSerializer}s that have not been registered. Using this marker
+   * object allows us to asynchronously send {@code DataSerializer} registration updates. If the
+   * serialized bytes arrive at a VM before the registration message does, the deserializer will
+   * wait an amount of time for the registration message to arrive.
+   */
+  abstract static class Marker {
+    /**
+     * The DataSerializer that is filled in upon registration
+     */
+    protected DataSerializer serializer = null;
+
+    /**
+     * set to true once setSerializer is called.
+     */
+    boolean hasBeenSet = false;
+
+    abstract DataSerializer getSerializer();
+
+    /**
+     * Sets the serializer associated with this marker. It will notify any threads that are waiting
+     * for the serializer to be registered.
+     */
+    void setSerializer(DataSerializer serializer) {
+      synchronized (this) {
+        this.hasBeenSet = true;
+        this.serializer = serializer;
+        this.notifyAll();
+      }
+    }
+  }
+
+  /**
+   * A marker object for {@code DataSerializer}s that have not been registered. Using this marker
+   * object allows us to asynchronously send {@code DataSerializer} registration updates. If the
+   * serialized bytes arrive at a VM before the registration message does, the deserializer will
+   * wait an amount of time for the registration message to arrive. Made public for unit test
+   * access.
+   * @since GemFire 5.7
+   */
+  public static class GetMarker extends Marker {
+    /**
+     * Number of milliseconds to wait. Also used by InternalInstantiator. Note that some tests set
+     * this to a small amount to speed up failures. Made public for unit test access.
+     */
+    public static int WAIT_MS = Integer.getInteger(
+        DistributionConfig.GEMFIRE_PREFIX + "InternalDataSerializer.WAIT_MS", 60 * 1000);
+
+    /**
+     * Returns the serializer associated with this marker. If the serializer has not been registered
+     * yet, then this method will wait until the serializer is registered. If this method has to
+     * wait for too long, then {@code null} is returned.
+     */
+    @Override
+    DataSerializer getSerializer() {
+      synchronized (this) {
         boolean firstTime = true;
         long endTime = 0;
         while (!this.hasBeenSet) {
@@ -3415,7 +3541,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   /**
    * A marker object for {@code DataSerializer}s that is in the process of being registered. It is
    * possible for getSerializer to return {@code null}
-   *
    * @since GemFire 5.7
    */
   static class InitMarker extends Marker {
@@ -3452,29 +3577,27 @@ public abstract class InternalDataSerializer extends DataSerializer {
    */
   public static class RegistrationMessage extends SerialDistributionMessage {
     /**
-     * The id of the {@code DataSerializer} that was registered since 5.7 an int instead of a byte
+     * The versions in which this message was modified
      */
-    private int id;
-
+    private static final Version[] dsfidVersions = new Version[]{};
     /**
      * The eventId of the {@code DataSerializer} that was registered
      */
-    protected EventID eventId;
-
+    protected EventID eventId;
+    /**
+     * The id of the {@code DataSerializer} that was registered since 5.7 an int instead of a byte
+     */
+    private int id;
     /**
      * The name of the {@code DataSerializer} class
      */
     private String className;
 
     /**
-     * The versions in which this message was modified
-     */
-    private static final Version[] dsfidVersions = new Version[] {};
-
-    /**
      * Constructor for {@code DataSerializable}
      */
-    public RegistrationMessage() {}
+    public RegistrationMessage() {
+    }
 
     /**
      * Creates a new {@code RegistrationMessage} that broadcasts that the given {@code
@@ -3592,28 +3715,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
   }
 
   /**
-   * A listener whose listener methods are invoked when {@link DataSerializer}s and {@link
-   * Instantiator}s are registered. This is part of the fix for bug 31422.
-   *
-   * @see InternalDataSerializer#addRegistrationListener
-   * @see InternalDataSerializer#removeRegistrationListener
-   */
-  public interface RegistrationListener {
-
-    /**
-     * Invoked when a new {@code Instantiator} is {@linkplain Instantiator#register(Instantiator)
-     * registered}.
-     */
-    void newInstantiator(Instantiator instantiator);
-
-    /**
-     * Invoked when a new {@code DataSerializer} is {@linkplain DataSerializer#register(Class)
-     * registered}.
-     */
-    void newDataSerializer(DataSerializer ds);
-  }
-
-  /**
    * An {@code ObjectInputStream} whose {@link #resolveClass} method loads classes from the current
    * context class loader.
    */
@@ -3685,7 +3786,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   /**
    * Used to implement serialization code for the well known classes we support in DataSerializer.
-   *
    * @since GemFire 5.7
    */
   protected abstract static class WellKnownDS extends DataSerializer {
@@ -3716,235 +3816,4 @@ public abstract class InternalDataSerializer extends DataSerializer {
   protected abstract static class WellKnownPdxDS extends WellKnownDS {
     // subclasses need to implement toData
   }
-
-  public static void writeObjectArray(Object[] array, DataOutput out, boolean ensureCompatibility)
-      throws IOException {
-    InternalDataSerializer.checkOut(out);
-    int length;
-    if (array == null) {
-      length = -1;
-    } else {
-      length = array.length;
-    }
-    InternalDataSerializer.writeArrayLength(length, out);
-    if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
-      logger.trace(LogMarker.SERIALIZER_VERBOSE, "Writing Object array of length {}", length);
-    }
-    if (length >= 0) {
-      writeClass(array.getClass().getComponentType(), out);
-      for (int i = 0; i < length; i++) {
-        basicWriteObject(array[i], out, ensureCompatibility);
-      }
-    }
-  }
-
-  // Variable Length long encoded as int in next 4 bytes
-  private static final byte INT_VL = 126;
-
-  // Variable Length long encoded as long in next 8 bytes
-  private static final byte LONG_VL = 127;
-
-  private static final int MAX_BYTE_VL = 125;
-
-  /**
-   * Write a variable length long the old way (pre 7.0). Use this only in contexts where you might
-   * need to communicate with pre 7.0 members or files.
-   */
-  public static void writeVLOld(long data, DataOutput out) throws IOException {
-    if (data < 0) {
-      Assert.fail("Data expected to be >=0 is " + data);
-    }
-    if (data <= MAX_BYTE_VL) {
-      out.writeByte((byte) data);
-    } else if (data <= 0x7FFF) {
-      // set the sign bit to indicate a short
-      out.write(((int) data >>> 8 | 0x80) & 0xFF);
-      out.write((int) data >>> 0 & 0xFF);
-    } else if (data <= Integer.MAX_VALUE) {
-      out.writeByte(INT_VL);
-      out.writeInt((int) data);
-    } else {
-      out.writeByte(LONG_VL);
-      out.writeLong(data);
-    }
-  }
-
-  /**
-   * Write a variable length long the old way (pre 7.0). Use this only in contexts where you might
-   * need to communicate with pre 7.0 members or files.
-   */
-  public static long readVLOld(DataInput in) throws IOException {
-    byte code = in.readByte();
-    long result;
-    if (code < 0) {
-      // mask off sign bit
-      result = code & 0x7F;
-      result <<= 8;
-      result |= in.readByte() & 0xFF;
-    } else if (code <= MAX_BYTE_VL) {
-      result = code;
-    } else if (code == INT_VL) {
-      result = in.readInt();
-    } else if (code == LONG_VL) {
-      result = in.readLong();
-    } else {
-      throw new IllegalStateException("unexpected variable length code=" + code);
-    }
-    return result;
-  }
-
-  /**
-   * Encode a long as a variable length array.
-   *
-   * This method is appropriate for unsigned integers. For signed integers, negative values will
-   * always consume 10 bytes, so it is recommended to use writeSignedVL instead.
-   *
-   * This is taken from the varint encoding in protobufs (BSD licensed). See
-   * https://developers.google.com/protocol-buffers/docs/encoding
-   */
-  public static void writeUnsignedVL(long data, DataOutput out) throws IOException {
-    while (true) {
-      if ((data & ~0x7FL) == 0) {
-        out.writeByte((int) data);
-        return;
-      } else {
-        out.writeByte((int) data & 0x7F | 0x80);
-        data >>>= 7;
-      }
-    }
-  }
-
-  /**
-   * Decode a long as a variable length array.
-   *
-   * This is taken from the varint encoding in protobufs (BSD licensed). See
-   * https://developers.google.com/protocol-buffers/docs/encoding
-   */
-  public static long readUnsignedVL(DataInput in) throws IOException {
-    int shift = 0;
-    long result = 0;
-    while (shift < 64) {
-      final byte b = in.readByte();
-      result |= (long) (b & 0x7F) << shift;
-      if ((b & 0x80) == 0) {
-        return result;
-      }
-      shift += 7;
-    }
-    throw new GemFireIOException("Malformed variable length integer");
-  }
-
-  /**
-   * Encode a signed long as a variable length array.
-   *
-   * This method is appropriate for signed integers. It uses zig zag encoding to so that negative
-   * numbers will be represented more compactly. For unsigned values, writeUnsignedVL will be more
-   * efficient.
-   */
-  public static void writeSignedVL(long data, DataOutput out) throws IOException {
-    writeUnsignedVL(encodeZigZag64(data), out);
-  }
-
-  /**
-   * Decode a signed long as a variable length array.
-   *
-   * This method is appropriate for signed integers. It uses zig zag encoding to so that negative
-   * numbers will be represented more compactly. For unsigned values, writeUnsignedVL will be more
-   * efficient.
-   */
-  public static long readSignedVL(DataInput in) throws IOException {
-    return decodeZigZag64(readUnsignedVL(in));
-  }
-
-  /**
-   * Decode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers into values that can be
-   * efficiently encoded with varint. (Otherwise, negative values must be sign-extended to 64 bits
-   * to be varint encoded, thus always taking 10 bytes on the wire.)
-   *
-   * @param n An unsigned 64-bit integer, stored in a signed int because Java has no explicit
-   *        unsigned support.
-   * @return A signed 64-bit integer.
-   *
-   *         This is taken from the varint encoding in protobufs (BSD licensed). See
-   *         https://developers.google.com/protocol-buffers/docs/encoding
-   */
-  private static long decodeZigZag64(final long n) {
-    return n >>> 1 ^ -(n & 1);
-  }
-
-  /**
-   * Encode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers into values that can be
-   * efficiently encoded with varint. (Otherwise, negative values must be sign-extended to 64 bits
-   * to be varint encoded, thus always taking 10 bytes on the wire.)
-   *
-   * @param n A signed 64-bit integer.
-   * @return An unsigned 64-bit integer, stored in a signed int because Java has no explicit
-   *         unsigned support.
-   *
-   *         This is taken from the varint encoding in protobufs (BSD licensed). See
-   *         https://developers.google.com/protocol-buffers/docs/encoding
-   */
-  private static long encodeZigZag64(final long n) {
-    // Note: the right-shift must be arithmetic
-    return n << 1 ^ n >> 63;
-  }
-
-  /* test only method */
-  public static int calculateBytesForTSandDSID(int dsid) {
-    HeapDataOutputStream out = new HeapDataOutputStream(4 + 8, Version.CURRENT);
-    long now = System.currentTimeMillis();
-    try {
-      writeUnsignedVL(now, out);
-      writeUnsignedVL(InternalDataSerializer.encodeZigZag64(dsid), out);
-    } catch (IOException ignored) {
-      return 0;
-    }
-    return out.size();
-  }
-
-  public static final boolean LOAD_CLASS_EACH_TIME =
-      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "loadClassOnEveryDeserialization");
-
-  private static final CopyOnWriteHashMap<String, WeakReference<Class<?>>> classCache =
-      LOAD_CLASS_EACH_TIME ? null : new CopyOnWriteHashMap<>();
-
-  private static final Object cacheAccessLock = new Object();
-
-  public static Class<?> getCachedClass(String p_className) throws ClassNotFoundException {
-    String className = processIncomingClassName(p_className);
-    if (LOAD_CLASS_EACH_TIME) {
-      return ClassPathLoader.getLatest().forName(className);
-    } else {
-      Class<?> result = getExistingCachedClass(className);
-      if (result == null) {
-        // Do the forName call outside the sync to fix bug 46172
-        result = ClassPathLoader.getLatest().forName(className);
-        synchronized (cacheAccessLock) {
-          Class<?> cachedClass = getExistingCachedClass(className);
-          if (cachedClass == null) {
-            classCache.put(className, new WeakReference<>(result));
-          } else {
-            result = cachedClass;
-          }
-        }
-      }
-      return result;
-    }
-  }
-
-  private static Class<?> getExistingCachedClass(String className) {
-    WeakReference<Class<?>> wr = classCache.get(className);
-    Class<?> result = null;
-    if (wr != null) {
-      result = wr.get();
-    }
-    return result;
-  }
-
-  public static void flushClassCache() {
-    if (classCache != null) {
-      // Not locking classCache during clear as doing so causes a deadlock in the DeployedJar
-      classCache.clear();
-    }
-  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/DscodeHelper.java b/geode-core/src/main/java/org/apache/geode/internal/util/DscodeHelper.java
new file mode 100644
index 0000000..57d7ae2
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/util/DscodeHelper.java
@@ -0,0 +1,19 @@
+package org.apache.geode.internal.util;
+
+import java.util.Arrays;
+
+import org.apache.geode.internal.DSCODE;
+
+public class DscodeHelper {
+
+  private static final DSCODE[] dscodes = new DSCODE[128];
+
+  static {
+    Arrays.stream(DSCODE.values()).filter(dscode -> dscode.toByte() >= 0)
+        .forEach(dscode -> dscodes[dscode.toByte()] = dscode);
+  }
+
+  public static DSCODE toDSCODE(final byte value) {
+    return dscodes[value];
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/DSCODETest.java b/geode-core/src/test/java/org/apache/geode/internal/DSCODETest.java
index 7bc7684..d4007bb 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/DSCODETest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/DSCODETest.java
@@ -16,12 +16,15 @@ package org.apache.geode.internal;
 
 import static org.junit.Assert.assertFalse;
 
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.internal.util.DscodeHelper;
 import org.apache.geode.test.junit.categories.SerializationTest;
 
 @Category({SerializationTest.class})
@@ -36,4 +39,11 @@ public class DSCODETest {
       previouslySeen.add(integerValue);
     }
   }
+
+  @Test
+  public void testGetEnumFromByte() {
+    Arrays.stream(DSCODE.values())
+        .filter(dscode -> dscode != DSCODE.RESERVED_FOR_FUTURE_USE && dscode != DSCODE.ILLEGAL)
+        .forEach(dscode -> Assert.assertEquals(dscode, DscodeHelper.toDSCODE(dscode.toByte())));
+  }
 }