You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2019/05/03 22:01:56 UTC

[geode] branch develop updated: GEODE-6687: replace ByteArrayInputStream+DataInputStream with ByteArrayDataInput (#3547)

This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new aa63083  GEODE-6687: replace ByteArrayInputStream+DataInputStream with ByteArrayDataInput (#3547)
aa63083 is described below

commit aa630830f1c3694bc26366dcec3ed0d40b827222
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Fri May 3 15:01:44 2019 -0700

    GEODE-6687: replace ByteArrayInputStream+DataInputStream with ByteArrayDataInput (#3547)
    
    * Code that used DataInputStream(ByteArrayInputStream) as a DataInput now use ByteArrayDataInput.
    * Added a unit test that showed that ByteArrayDataInput.readFully(byte[])
    would throw IndexOutOfBoundsException instead of EOFException.
    To fix this changed readFully(byte[]) to call readFully(byte[],int,int)
    which was correctly implemented to throw EOFException.
    
    * readLine is now implemented on ByteArrayDataInput.
    The old implementation just threw an exception.
---
 .../geode/cache/client/internal/AbstractOp.java    |   5 +-
 .../cache/client/internal/AuthenticateUserOp.java  |   5 +-
 .../client/internal/ClientSideHandshakeImpl.java   |   8 +-
 .../apache/geode/cache/client/internal/PutOp.java  |   6 +-
 .../apache/geode/cache/wan/EventSequenceID.java    |   5 +-
 .../apache/geode/internal/ByteArrayDataInput.java  |  52 +++++-
 .../geode/internal/cache/EntryEventImpl.java       |   4 +-
 .../org/apache/geode/internal/cache/EventID.java   |  12 +-
 .../geode/internal/cache/FilterRoutingInfo.java    |  13 +-
 .../geode/internal/cache/ha/ThreadIdentifier.java  |   5 +-
 .../geode/internal/cache/tier/sockets/AuthIds.java |   6 +-
 .../cache/tier/sockets/CacheClientProxy.java       |   5 +-
 .../cache/tier/sockets/CacheClientUpdater.java     |   5 +-
 .../tier/sockets/ClientProxyMembershipID.java      |   9 +-
 .../tier/sockets/ClientUpdateMessageImpl.java      |   5 +-
 .../internal/cache/tier/sockets/EncryptorImpl.java |   8 +-
 .../cache/tier/sockets/ServerConnection.java       |   6 +-
 .../apache/geode/internal/offheap/DataType.java    |   5 +-
 .../geode/internal/ByteArrayDataInputTest.java     | 201 +++++++++++++++++++++
 19 files changed, 288 insertions(+), 77 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java
index 0a22879..c65c79d 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java
@@ -15,8 +15,6 @@
 
 package org.apache.geode.cache.client.internal;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.net.SocketTimeoutException;
 
 import org.apache.logging.log4j.Logger;
@@ -24,6 +22,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.InternalGemFireError;
 import org.apache.geode.cache.client.ServerConnectivityException;
 import org.apache.geode.cache.client.ServerOperationException;
+import org.apache.geode.internal.ByteArrayDataInput;
 import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.PutAllPartialResultException;
@@ -159,7 +158,7 @@ public abstract class AbstractOp implements Op {
         return;
       }
       byte[] bytes = ((ConnectionImpl) cnx).decryptBytes(partBytes);
-      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
+      ByteArrayDataInput dis = new ByteArrayDataInput(bytes);
       cnx.setConnectionID(dis.readLong());
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/AuthenticateUserOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/AuthenticateUserOp.java
index ab2b0df..333b68e 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/AuthenticateUserOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/AuthenticateUserOp.java
@@ -17,8 +17,6 @@ package org.apache.geode.cache.client.internal;
 
 import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTH_INIT;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.util.Properties;
 
 import org.apache.geode.DataSerializer;
@@ -29,6 +27,7 @@ import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.ByteArrayDataInput;
 import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.tier.MessageType;
@@ -182,7 +181,7 @@ public class AuthenticateUserOp {
         } else {
           cnx.getServer().setRequiresCredentials(true);
           byte[] decrypted = ((ConnectionImpl) cnx).decryptBytes(bytes);
-          DataInputStream dis = new DataInputStream(new ByteArrayInputStream(decrypted));
+          ByteArrayDataInput dis = new ByteArrayDataInput(decrypted);
           userId = dis.readLong();
         }
         if (needsServerLocation) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java
index 49fecf4..6a25c0d 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java
@@ -17,7 +17,6 @@ package org.apache.geode.cache.client.internal;
 import static org.apache.geode.distributed.ConfigurationProperties.CONFLATE_EVENTS;
 import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTH_INIT;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
@@ -50,6 +49,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.LonerDistributionManager;
 import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.ByteArrayDataInput;
 import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.InternalInstantiator;
@@ -269,12 +269,8 @@ public class ClientSideHandshakeImpl extends Handshake implements ClientSideHand
   private InternalDistributedMember readServerMember(DataInputStream p_dis) throws IOException {
 
     byte[] memberBytes = DataSerializer.readByteArray(p_dis);
-    ByteArrayInputStream bais = new ByteArrayInputStream(memberBytes);
-    DataInputStream dis = new DataInputStream(bais);
     Version v = InternalDataSerializer.getVersionForDataStreamOrNull(p_dis);
-    if (v != null) {
-      dis = new VersionedDataInputStream(dis, v);
-    }
+    ByteArrayDataInput dis = new ByteArrayDataInput(memberBytes, v);
     try {
       return DataSerializer.readObject(dis);
     } catch (EOFException e) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java
index 8026d3f..57b25bd 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java
@@ -15,8 +15,6 @@
 
 package org.apache.geode.cache.client.internal;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 
 import org.apache.logging.log4j.Logger;
 
@@ -29,6 +27,7 @@ import org.apache.geode.cache.client.ServerConnectivityException;
 import org.apache.geode.cache.client.ServerOperationException;
 import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.ByteArrayDataInput;
 import org.apache.geode.internal.cache.CachedDeserializable;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.LocalRegion;
@@ -290,8 +289,7 @@ public class PutOp {
         if ((flags & HAS_OLD_VALUE_FLAG) != 0) {
           oldValue = msg.getPart(partIdx++).getObject();
           if ((flags & OLD_VALUE_IS_OBJECT_FLAG) != 0 && oldValue instanceof byte[]) {
-            ByteArrayInputStream in = new ByteArrayInputStream((byte[]) oldValue);
-            DataInputStream din = new DataInputStream(in);
+            ByteArrayDataInput din = new ByteArrayDataInput((byte[]) oldValue);
             oldValue = DataSerializer.readObject(din);
           }
         }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/wan/EventSequenceID.java b/geode-core/src/main/java/org/apache/geode/cache/wan/EventSequenceID.java
index 5d936b9..8400376 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/wan/EventSequenceID.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/wan/EventSequenceID.java
@@ -14,11 +14,10 @@
  */
 package org.apache.geode.cache.wan;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.util.Arrays;
 
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.ByteArrayDataInput;
 
 /**
  * This class wraps 1) DistributedMembershipID 2) ThreadID 3) SequenceID attributes which are used
@@ -48,7 +47,7 @@ public class EventSequenceID {
     Object mbr;
     try {
       mbr = InternalDistributedMember
-          .readEssentialData(new DataInputStream(new ByteArrayInputStream(membershipID)));
+          .readEssentialData(new ByteArrayDataInput(membershipID));
     } catch (Exception e) {
       mbr = Arrays.toString(membershipID); // punt and use the bytes
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/ByteArrayDataInput.java b/geode-core/src/main/java/org/apache/geode/internal/ByteArrayDataInput.java
index caed517..74cb361 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/ByteArrayDataInput.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/ByteArrayDataInput.java
@@ -42,6 +42,14 @@ public class ByteArrayDataInput extends InputStream implements DataInput, Versio
    */
   public ByteArrayDataInput() {}
 
+  public ByteArrayDataInput(byte[] bytes) {
+    initialize(bytes, null);
+  }
+
+  public ByteArrayDataInput(byte[] bytes, Version version) {
+    initialize(bytes, version);
+  }
+
   /**
    * Initialize this byte array stream with given byte array and version.
    *
@@ -146,9 +154,7 @@ public class ByteArrayDataInput extends InputStream implements DataInput, Versio
    */
   @Override
   public void readFully(byte[] b) throws IOException {
-    final int len = b.length;
-    System.arraycopy(this.bytes, this.pos, b, 0, len);
-    this.pos += len;
+    readFully(b, 0, b.length);
   }
 
   /**
@@ -391,11 +397,49 @@ public class ByteArrayDataInput extends InputStream implements DataInput, Versio
   }
 
   /**
+   * Behaves like InputStream.read()
+   * Returns the next byte as an int in the range [0..255]
+   * or -1 if at EOF.
+   */
+  private int readByteAsInt() {
+    if (this.pos >= this.nBytes) {
+      return -1;
+    } else {
+      return this.bytes[this.pos++] & 0xff;
+    }
+  }
+
+  /**
    * {@inheritDoc}
    */
   @Override
   public String readLine() throws IOException {
-    throw new UnsupportedOperationException();
+    if (this.pos >= this.nBytes) {
+      return null;
+    }
+    // index of the first byte in the line
+    int startIdx = this.pos;
+    // index of the last byte in the line
+    int lastIdx = -1;
+    while (lastIdx == -1) {
+      int c = readByteAsInt();
+      switch (c) {
+        case -1:
+          lastIdx = this.pos;
+          break;
+        case '\n':
+          lastIdx = this.pos - 1;
+          break;
+        case '\r':
+          lastIdx = this.pos - 1;
+          int c2 = readByteAsInt();
+          if (c2 != '\n' && c2 != -1) {
+            this.pos--;
+          }
+          break;
+      }
+    }
+    return new String(this.bytes, 0, startIdx, lastIdx - startIdx);
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index 7b84506..fecb9b0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -17,9 +17,7 @@ package org.apache.geode.internal.cache;
 import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE;
 import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_OLD_VALUE;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.function.Function;
@@ -1810,7 +1808,7 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
       try {
         long start = CachePerfStats.getStatTime();
         ((org.apache.geode.Delta) value)
-            .fromDelta(new DataInputStream(new ByteArrayInputStream(getDeltaBytes())));
+            .fromDelta(new ByteArrayDataInput(getDeltaBytes()));
         getRegion().getCachePerfStats().endDeltaUpdate(start);
         deltaBytesApplied = true;
       } catch (RuntimeException rte) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
index 0a6ce63..04d1343 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
@@ -14,9 +14,7 @@
  */
 package org.apache.geode.internal.cache;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.Externalizable;
 import java.io.IOException;
@@ -39,11 +37,11 @@ import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.ByteArrayDataInput;
 import org.apache.geode.internal.DataSerializableFixedID;
 import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
-import org.apache.geode.internal.VersionedDataInputStream;
 import org.apache.geode.internal.cache.ha.HARegionQueue;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
@@ -310,13 +308,13 @@ public class EventID implements DataSerializableFixedID, Serializable, Externali
    * have UUID bytes in the memberID. Newer clients don't require this.
    */
   public InternalDistributedMember getDistributedMember(Version targetVersion) {
-    ByteArrayInputStream bais = new ByteArrayInputStream(this.membershipID);
-    DataInputStream dis = new DataInputStream(bais);
+    Version disVersion = null;
     if (targetVersion.compareTo(Version.GEODE_1_1_0) < 0) {
       // GEODE-3153: clients expect to receive UUID bytes, which are only
       // read if the stream's version is 1.0.0-incubating
-      dis = new VersionedDataInputStream(dis, Version.GFE_90);
+      disVersion = Version.GFE_90;
     }
+    ByteArrayDataInput dis = new ByteArrayDataInput(membershipID, disVersion);
     InternalDistributedMember result = null;
     try {
       result = InternalDistributedMember.readEssentialData(dis);
@@ -506,7 +504,7 @@ public class EventID implements DataSerializableFixedID, Serializable, Externali
     Object mbr;
     try {
       mbr = InternalDistributedMember
-          .readEssentialData(new DataInputStream(new ByteArrayInputStream(membershipID)));
+          .readEssentialData(new ByteArrayDataInput(membershipID));
     } catch (Exception e) {
       mbr = membershipID; // punt and use the bytes
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
index 22af908..835b3e9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
@@ -14,12 +14,9 @@
  */
 package org.apache.geode.internal.cache;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -32,11 +29,11 @@ import org.apache.geode.annotations.Immutable;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.ByteArrayDataInput;
 import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.ObjToByteArraySerializer;
 import org.apache.geode.internal.Version;
-import org.apache.geode.internal.VersionedDataInputStream;
 import org.apache.geode.internal.VersionedDataSerializable;
 
 /**
@@ -490,13 +487,7 @@ public class FilterRoutingInfo implements VersionedDataSerializable {
      */
     private void deserialize() {
       try {
-        InputStream is = new ByteArrayInputStream(myData);
-        DataInputStream dis;
-        if (this.myDataVersion != null) {
-          dis = new VersionedDataInputStream(is, this.myDataVersion);
-        } else {
-          dis = new DataInputStream(is);
-        }
+        ByteArrayDataInput dis = new ByteArrayDataInput(myData, myDataVersion);
         boolean hasCQs = dis.readBoolean();
         if (hasCQs) {
           int numEntries = InternalDataSerializer.readArrayLength(dis);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java
index 446ce20..ed7af75 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java
@@ -14,9 +14,7 @@
  */
 package org.apache.geode.internal.cache.ha;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
@@ -24,6 +22,7 @@ import java.util.Arrays;
 import org.apache.geode.DataSerializable;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.ByteArrayDataInput;
 import org.apache.geode.internal.cache.EventID;
 
 /**
@@ -204,7 +203,7 @@ public class ThreadIdentifier implements DataSerializable {
     Object mbr;
     try {
       mbr = InternalDistributedMember
-          .readEssentialData(new DataInputStream(new ByteArrayInputStream(membershipID)));
+          .readEssentialData(new ByteArrayDataInput(membershipID));
     } catch (Exception e) {
       mbr = membershipID; // punt and use the bytes
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AuthIds.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AuthIds.java
index 556cb36..9bf4b3e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AuthIds.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AuthIds.java
@@ -16,15 +16,15 @@
 
 package org.apache.geode.internal.cache.tier.sockets;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
+
+import org.apache.geode.internal.ByteArrayDataInput;
 
 public class AuthIds {
   private long connectionId;
   private long uniqueId;
 
   public AuthIds(byte[] bytes) throws Exception {
-    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
+    ByteArrayDataInput dis = new ByteArrayDataInput(bytes);
     if (bytes.length == 8) {
       // only connectionid
       connectionId = dis.readLong();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index acf21cb..5c12281 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -14,8 +14,6 @@
  */
 package org.apache.geode.internal.cache.tier.sockets;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.SocketException;
@@ -70,6 +68,7 @@ import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.ByteArrayDataInput;
 import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.SystemTimer.SystemTimerTask;
 import org.apache.geode.internal.Version;
@@ -2852,7 +2851,7 @@ public class CacheClientProxy implements ClientSession {
       // This is a debugging method so ignore all exceptions like
       // ClassNotFoundException
       try {
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(serializedBytes));
+        ByteArrayDataInput dis = new ByteArrayDataInput(serializedBytes);
         deserializedObject = DataSerializer.readObject(dis);
       } catch (Exception e) {
       }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
index d492969..c335e88 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
@@ -14,8 +14,6 @@
  */
 package org.apache.geode.internal.cache.tier.sockets;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InterruptedIOException;
@@ -65,6 +63,7 @@ import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.distributed.internal.membership.MemberAttributes;
 import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.ByteArrayDataInput;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.InternalInstantiator;
 import org.apache.geode.internal.Version;
@@ -1804,7 +1803,7 @@ public class CacheClientUpdater extends LoggingThread implements ClientUpdater,
     Object deserializedObject = serializedBytes;
     // This is a debugging method so ignore all exceptions like ClassNotFoundException
     try {
-      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(serializedBytes));
+      ByteArrayDataInput dis = new ByteArrayDataInput(serializedBytes);
       deserializedObject = DataSerializer.readObject(dis);
     } catch (ClassNotFoundException | IOException ignore) {
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
index 64ad8f9..be95048 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
@@ -16,9 +16,7 @@ package org.apache.geode.internal.cache.tier.sockets;
 
 import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.Externalizable;
 import java.io.IOException;
@@ -38,10 +36,10 @@ import org.apache.geode.distributed.DurableClientAttributes;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.ByteArrayDataInput;
 import org.apache.geode.internal.DataSerializableFixedID;
 import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.Version;
-import org.apache.geode.internal.VersionedDataInputStream;
 import org.apache.geode.internal.logging.LogService;
 
 /**
@@ -393,10 +391,9 @@ public class ClientProxyMembershipID
 
   public DistributedMember getDistributedMember() {
     if (memberId == null) {
-      ByteArrayInputStream bais = new ByteArrayInputStream(identity);
-      DataInputStream dis = new VersionedDataInputStream(bais, Version.CURRENT);
+      ByteArrayDataInput dataInput = new ByteArrayDataInput(identity);
       try {
-        memberId = (DistributedMember) DataSerializer.readObject(dis);
+        memberId = (DistributedMember) DataSerializer.readObject(dataInput);
       } catch (Exception e) {
         logger.error("Unable to deserialize membership id", e);
       }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
index 36f9f7f..64c180b 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
@@ -15,9 +15,7 @@
 
 package org.apache.geode.internal.cache.tier.sockets;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.HashMap;
@@ -34,6 +32,7 @@ import org.apache.geode.GemFireIOException;
 import org.apache.geode.InternalGemFireError;
 import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
 import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.internal.ByteArrayDataInput;
 import org.apache.geode.internal.DSCODE;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Sendable;
@@ -1219,7 +1218,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     // This is a debugging method so ignore all exceptions like
     // ClassNotFoundException
     try {
-      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(serializedBytes));
+      ByteArrayDataInput dis = new ByteArrayDataInput(serializedBytes);
       deserializedObject = DataSerializer.readObject(dis);
     } catch (Exception e) {
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/EncryptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/EncryptorImpl.java
index 66d6180..abe74fa 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/EncryptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/EncryptorImpl.java
@@ -23,7 +23,6 @@ import static org.apache.geode.internal.cache.tier.sockets.Handshake.PUBLIC_KEY_
 import static org.apache.geode.internal.cache.tier.sockets.Handshake.REPLY_AUTH_NOT_REQUIRED;
 import static org.apache.geode.internal.cache.tier.sockets.Handshake.REPLY_OK;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileInputStream;
@@ -66,6 +65,7 @@ import org.apache.geode.LogWriter;
 import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.internal.ByteArrayDataInput;
 import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.tier.Encryptor;
@@ -582,8 +582,7 @@ public class EncryptorImpl implements Encryptor {
       byte[] encBytes = DataSerializer.readByteArray(dis);
       Cipher c = getDecryptCipher(this.clientSKAlgo, this.clientPublicKey);
       byte[] credentialBytes = decryptBytes(encBytes, c);
-      ByteArrayInputStream bis = new ByteArrayInputStream(credentialBytes);
-      DataInputStream dinp = new DataInputStream(bis);
+      ByteArrayDataInput dinp = new ByteArrayDataInput(credentialBytes);
       // credentials = DataSerializer.readProperties(dinp);//Hitesh: we don't send in handshake
       // now
       byte[] challengeRes = DataSerializer.readByteArray(dinp);
@@ -687,8 +686,7 @@ public class EncryptorImpl implements Encryptor {
       }
 
       byte[] credentialBytes = decrypt.doFinal(encBytes);
-      ByteArrayInputStream bis = new ByteArrayInputStream(credentialBytes);
-      DataInputStream dinp = new DataInputStream(bis);
+      ByteArrayDataInput dinp = new ByteArrayDataInput(credentialBytes);
       credentials = DataSerializer.readProperties(dinp);
       byte[] challengeRes = DataSerializer.readByteArray(dinp);
       // Check the challenge string
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index aeedd9e..d433b81 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -18,8 +18,6 @@ import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIE
 import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_ACCESSOR_PP;
 import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTHENTICATOR;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
@@ -55,6 +53,7 @@ import org.apache.geode.cache.UnsupportedVersionException;
 import org.apache.geode.cache.client.internal.Connection;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.ByteArrayDataInput;
 import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.EventID;
@@ -1070,8 +1069,7 @@ public abstract class ServerConnection implements Runnable {
 
       credBytes = handshake.getEncryptor().decryptBytes(credBytes);
 
-      ByteArrayInputStream bis = new ByteArrayInputStream(credBytes);
-      DataInputStream dinp = new DataInputStream(bis);
+      ByteArrayDataInput dinp = new ByteArrayDataInput(credBytes);
       Properties credentials = DataSerializer.readProperties(dinp);
 
       // When here, security is enforced on server, if login returns a subject, then it's the newly
diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/DataType.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/DataType.java
index 35183a5..e4c495f 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/offheap/DataType.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/DataType.java
@@ -14,13 +14,12 @@
  */
 package org.apache.geode.internal.offheap;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.IOException;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.Instantiator;
+import org.apache.geode.internal.ByteArrayDataInput;
 import org.apache.geode.internal.DSCODE;
 import org.apache.geode.internal.DSFIDFactory;
 import org.apache.geode.internal.InternalDataSerializer;
@@ -280,6 +279,6 @@ public class DataType {
   }
 
   public static DataInput getDataInput(byte[] bytes) {
-    return new DataInputStream(new ByteArrayInputStream(bytes));
+    return new ByteArrayDataInput(bytes);
   }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/ByteArrayDataInputTest.java b/geode-core/src/test/java/org/apache/geode/internal/ByteArrayDataInputTest.java
new file mode 100644
index 0000000..5a89dd0
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/ByteArrayDataInputTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.junit.Test;
+
+public class ByteArrayDataInputTest {
+  @Test
+  public void readFullyThatReadsPastEndOfDataThrowsEOFException() throws IOException {
+    byte[] inputBytes = new byte[1];
+    DataInput input = createDataInput(inputBytes);
+    byte[] outputBytes = new byte[2];
+
+    Throwable t = catchThrowable(() -> input.readFully(outputBytes));
+
+    assertThat(t).isInstanceOf(EOFException.class);
+  }
+
+  @Test
+  public void readLineGivenInputAtEOFReturnsNull() throws IOException {
+    byte[] inputBytes = new byte[1];
+    DataInput input = createDataInput(inputBytes);
+    input.readByte();
+
+    String result = input.readLine();
+
+    assertThat(result).isNull();
+    assertThat(dataRemaining(input)).isEqualTo(0);
+  }
+
+  @Test
+  public void readLineGivenEmptyLineTerminatedByLineFeedReturnsEmptyString() throws IOException {
+    byte[] inputBytes = new byte[] {'\n'};
+    DataInput input = createDataInput(inputBytes);
+
+    String result = input.readLine();
+
+    assertThat(result).isEmpty();
+    assertThat(dataRemaining(input)).isEqualTo(0);
+  }
+
+  @Test
+  public void readLineGivenEmptyLineTerminatedByLineFeedReturnsEmptyStringDoesNotConsumeNextByte()
+      throws IOException {
+    byte[] inputBytes = new byte[] {'\n', 'a'};
+    DataInput input = createDataInput(inputBytes);
+
+    String result = input.readLine();
+
+    assertThat(result).isEmpty();
+    assertThat(dataRemaining(input)).isEqualTo(1);
+    assertThat(input.readUnsignedByte()).isEqualTo((int) 'a');
+  }
+
+  @Test
+  public void readLineGivenEmptyLineTerminatedByCarriageReturnReturnsEmptyString()
+      throws IOException {
+    byte[] inputBytes = new byte[] {'\r'};
+    DataInput input = createDataInput(inputBytes);
+
+    String result = input.readLine();
+
+    assertThat(result).isEmpty();
+    assertThat(dataRemaining(input)).isEqualTo(0);
+  }
+
+  @Test
+  public void readLineGivenEmptyLineTerminatedByCarriageReturnReturnsEmptyStringAndDoesNotConsumeNextByte()
+      throws IOException {
+    byte[] inputBytes = new byte[] {'\r', 'a'};
+    DataInput input = createDataInput(inputBytes);
+
+    String result = input.readLine();
+
+    assertThat(result).isEmpty();
+    assertThat(dataRemaining(input)).isEqualTo(1);
+    assertThat(input.readUnsignedByte()).isEqualTo((int) 'a');
+  }
+
+  @Test
+  public void readLineGivenEmptyLineTerminatedByCarriageReturnLineFeedReturnsEmptyString()
+      throws IOException {
+    byte[] inputBytes = new byte[] {'\r', '\n'};
+    DataInput input = createDataInput(inputBytes);
+
+    String result = input.readLine();
+
+    assertThat(result).isEmpty();
+    assertThat(dataRemaining(input)).isEqualTo(0);
+  }
+
+  @Test
+  public void readLineGivenEmptyLineTerminatedByCarriageReturnLineFeedReturnsEmptyStringDoesNotConsumeNextByte()
+      throws IOException {
+    byte[] inputBytes = new byte[] {'\r', '\n', 'a'};
+    DataInput input = createDataInput(inputBytes);
+
+    String result = input.readLine();
+
+    assertThat(result).isEmpty();
+    assertThat(dataRemaining(input)).isEqualTo(1);
+    assertThat(input.readUnsignedByte()).isEqualTo((int) 'a');
+  }
+
+  @Test
+  public void readLineGivenLineTerminatedByEOFReturnsCorrectLineData() throws IOException {
+    byte[] inputBytes = new byte[] {'a', 'b', 'c'};
+    DataInput input = createDataInput(inputBytes);
+
+    String result = input.readLine();
+
+    assertThat(result).isEqualTo("abc");
+    assertThat(dataRemaining(input)).isEqualTo(0);
+  }
+
+  @Test
+  public void readLineGivenLineTerminatedByLineFeedReturnsCorrectLineData() throws IOException {
+    byte[] inputBytes = new byte[] {'a', 'b', 'c', '\n', '2'};
+    DataInput input = createDataInput(inputBytes);
+
+    String result = input.readLine();
+
+    assertThat(result).isEqualTo("abc");
+    assertThat(dataRemaining(input)).isEqualTo(1);
+    assertThat(input.readUnsignedByte()).isEqualTo((int) '2');
+  }
+
+  @Test
+  public void readLineGivenLineTerminatedByCarriageReturnReturnsCorrectLineData()
+      throws IOException {
+    byte[] inputBytes = new byte[] {'a', 'b', 'c', '\r', '2'};
+    DataInput input = createDataInput(inputBytes);
+
+    String result = input.readLine();
+
+    assertThat(result).isEqualTo("abc");
+    assertThat(dataRemaining(input)).isEqualTo(1);
+    assertThat(input.readUnsignedByte()).isEqualTo((int) '2');
+  }
+
+  @Test
+  public void readLineGivenLineTerminatedByCarriageReturnLineFeedReturnsCorrectLineData()
+      throws IOException {
+    byte[] inputBytes = new byte[] {'a', 'b', 'c', '\r', '\n', '2'};
+    DataInput input = createDataInput(inputBytes);
+
+    String result = input.readLine();
+
+    assertThat(result).isEqualTo("abc");
+    assertThat(dataRemaining(input)).isEqualTo(1);
+    assertThat(input.readUnsignedByte()).isEqualTo((int) '2');
+  }
+
+  /**
+   * We want ByteArrayDataInput to behave like DataInputStream(ByteArrayInputStream).
+   * This boolean allows us to switch back and forth in this test to make sure
+   * they both behave the same. It should never be checked in with testJDK=true.
+   */
+  private boolean testJDK = false;
+
+  private DataInput createDataInput(byte[] inputBytes) {
+    if (testJDK) {
+      return new java.io.DataInputStream(new java.io.ByteArrayInputStream(inputBytes));
+    } else {
+      ByteArrayDataInput input = new ByteArrayDataInput();
+      input.initialize(inputBytes, null);
+      return input;
+    }
+  }
+
+  private int dataRemaining(DataInput dataInput) {
+    if (testJDK) {
+      try {
+        return ((java.io.DataInputStream) dataInput).available();
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
+      }
+    } else {
+      return ((ByteArrayDataInput) dataInput).available();
+    }
+  }
+}