You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2019/05/03 22:01:56 UTC
[geode] branch develop updated: GEODE-6687: replace
ByteArrayInputStream+DataInputStream with ByteArrayDataInput (#3547)
This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new aa63083 GEODE-6687: replace ByteArrayInputStream+DataInputStream with ByteArrayDataInput (#3547)
aa63083 is described below
commit aa630830f1c3694bc26366dcec3ed0d40b827222
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Fri May 3 15:01:44 2019 -0700
GEODE-6687: replace ByteArrayInputStream+DataInputStream with ByteArrayDataInput (#3547)
* Code that used DataInputStream(ByteArrayInputStream) as a DataInput now use ByteArrayDataInput.
* Added a unit test that showed that ByteArrayDataInput.readFully(byte[])
would throw IndexOutOfBoundsException instead of EOFException.
To fix this changed readFully(byte[]) to call readFully(byte[],int,int)
which was correctly implemented to throw EOFException.
* readLine is now implemented on ByteArrayDataInput.
The old implementation just threw an exception.
---
.../geode/cache/client/internal/AbstractOp.java | 5 +-
.../cache/client/internal/AuthenticateUserOp.java | 5 +-
.../client/internal/ClientSideHandshakeImpl.java | 8 +-
.../apache/geode/cache/client/internal/PutOp.java | 6 +-
.../apache/geode/cache/wan/EventSequenceID.java | 5 +-
.../apache/geode/internal/ByteArrayDataInput.java | 52 +++++-
.../geode/internal/cache/EntryEventImpl.java | 4 +-
.../org/apache/geode/internal/cache/EventID.java | 12 +-
.../geode/internal/cache/FilterRoutingInfo.java | 13 +-
.../geode/internal/cache/ha/ThreadIdentifier.java | 5 +-
.../geode/internal/cache/tier/sockets/AuthIds.java | 6 +-
.../cache/tier/sockets/CacheClientProxy.java | 5 +-
.../cache/tier/sockets/CacheClientUpdater.java | 5 +-
.../tier/sockets/ClientProxyMembershipID.java | 9 +-
.../tier/sockets/ClientUpdateMessageImpl.java | 5 +-
.../internal/cache/tier/sockets/EncryptorImpl.java | 8 +-
.../cache/tier/sockets/ServerConnection.java | 6 +-
.../apache/geode/internal/offheap/DataType.java | 5 +-
.../geode/internal/ByteArrayDataInputTest.java | 201 +++++++++++++++++++++
19 files changed, 288 insertions(+), 77 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java
index 0a22879..c65c79d 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java
@@ -15,8 +15,6 @@
package org.apache.geode.cache.client.internal;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.net.SocketTimeoutException;
import org.apache.logging.log4j.Logger;
@@ -24,6 +22,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.ServerOperationException;
+import org.apache.geode.internal.ByteArrayDataInput;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.PutAllPartialResultException;
@@ -159,7 +158,7 @@ public abstract class AbstractOp implements Op {
return;
}
byte[] bytes = ((ConnectionImpl) cnx).decryptBytes(partBytes);
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
+ ByteArrayDataInput dis = new ByteArrayDataInput(bytes);
cnx.setConnectionID(dis.readLong());
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/AuthenticateUserOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/AuthenticateUserOp.java
index ab2b0df..333b68e 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/AuthenticateUserOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/AuthenticateUserOp.java
@@ -17,8 +17,6 @@ package org.apache.geode.cache.client.internal;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTH_INIT;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.util.Properties;
import org.apache.geode.DataSerializer;
@@ -29,6 +27,7 @@ import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.ByteArrayDataInput;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.tier.MessageType;
@@ -182,7 +181,7 @@ public class AuthenticateUserOp {
} else {
cnx.getServer().setRequiresCredentials(true);
byte[] decrypted = ((ConnectionImpl) cnx).decryptBytes(bytes);
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(decrypted));
+ ByteArrayDataInput dis = new ByteArrayDataInput(decrypted);
userId = dis.readLong();
}
if (needsServerLocation) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java
index 49fecf4..6a25c0d 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java
@@ -17,7 +17,6 @@ package org.apache.geode.cache.client.internal;
import static org.apache.geode.distributed.ConfigurationProperties.CONFLATE_EVENTS;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTH_INIT;
-import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
@@ -50,6 +49,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.LonerDistributionManager;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.ByteArrayDataInput;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.InternalInstantiator;
@@ -269,12 +269,8 @@ public class ClientSideHandshakeImpl extends Handshake implements ClientSideHand
private InternalDistributedMember readServerMember(DataInputStream p_dis) throws IOException {
byte[] memberBytes = DataSerializer.readByteArray(p_dis);
- ByteArrayInputStream bais = new ByteArrayInputStream(memberBytes);
- DataInputStream dis = new DataInputStream(bais);
Version v = InternalDataSerializer.getVersionForDataStreamOrNull(p_dis);
- if (v != null) {
- dis = new VersionedDataInputStream(dis, v);
- }
+ ByteArrayDataInput dis = new ByteArrayDataInput(memberBytes, v);
try {
return DataSerializer.readObject(dis);
} catch (EOFException e) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java
index 8026d3f..57b25bd 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java
@@ -15,8 +15,6 @@
package org.apache.geode.cache.client.internal;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import org.apache.logging.log4j.Logger;
@@ -29,6 +27,7 @@ import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.ServerOperationException;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.ByteArrayDataInput;
import org.apache.geode.internal.cache.CachedDeserializable;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.LocalRegion;
@@ -290,8 +289,7 @@ public class PutOp {
if ((flags & HAS_OLD_VALUE_FLAG) != 0) {
oldValue = msg.getPart(partIdx++).getObject();
if ((flags & OLD_VALUE_IS_OBJECT_FLAG) != 0 && oldValue instanceof byte[]) {
- ByteArrayInputStream in = new ByteArrayInputStream((byte[]) oldValue);
- DataInputStream din = new DataInputStream(in);
+ ByteArrayDataInput din = new ByteArrayDataInput((byte[]) oldValue);
oldValue = DataSerializer.readObject(din);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/wan/EventSequenceID.java b/geode-core/src/main/java/org/apache/geode/cache/wan/EventSequenceID.java
index 5d936b9..8400376 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/wan/EventSequenceID.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/wan/EventSequenceID.java
@@ -14,11 +14,10 @@
*/
package org.apache.geode.cache.wan;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.util.Arrays;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.ByteArrayDataInput;
/**
* This class wraps 1) DistributedMembershipID 2) ThreadID 3) SequenceID attributes which are used
@@ -48,7 +47,7 @@ public class EventSequenceID {
Object mbr;
try {
mbr = InternalDistributedMember
- .readEssentialData(new DataInputStream(new ByteArrayInputStream(membershipID)));
+ .readEssentialData(new ByteArrayDataInput(membershipID));
} catch (Exception e) {
mbr = Arrays.toString(membershipID); // punt and use the bytes
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/ByteArrayDataInput.java b/geode-core/src/main/java/org/apache/geode/internal/ByteArrayDataInput.java
index caed517..74cb361 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/ByteArrayDataInput.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/ByteArrayDataInput.java
@@ -42,6 +42,14 @@ public class ByteArrayDataInput extends InputStream implements DataInput, Versio
*/
public ByteArrayDataInput() {}
+ public ByteArrayDataInput(byte[] bytes) {
+ initialize(bytes, null);
+ }
+
+ public ByteArrayDataInput(byte[] bytes, Version version) {
+ initialize(bytes, version);
+ }
+
/**
* Initialize this byte array stream with given byte array and version.
*
@@ -146,9 +154,7 @@ public class ByteArrayDataInput extends InputStream implements DataInput, Versio
*/
@Override
public void readFully(byte[] b) throws IOException {
- final int len = b.length;
- System.arraycopy(this.bytes, this.pos, b, 0, len);
- this.pos += len;
+ readFully(b, 0, b.length);
}
/**
@@ -391,11 +397,49 @@ public class ByteArrayDataInput extends InputStream implements DataInput, Versio
}
/**
+ * Behaves like InputStream.read()
+ * Returns the next byte as an int in the range [0..255]
+ * or -1 if at EOF.
+ */
+ private int readByteAsInt() {
+ if (this.pos >= this.nBytes) {
+ return -1;
+ } else {
+ return this.bytes[this.pos++] & 0xff;
+ }
+ }
+
+ /**
* {@inheritDoc}
*/
@Override
public String readLine() throws IOException {
- throw new UnsupportedOperationException();
+ if (this.pos >= this.nBytes) {
+ return null;
+ }
+ // index of the first byte in the line
+ int startIdx = this.pos;
+ // index of the last byte in the line
+ int lastIdx = -1;
+ while (lastIdx == -1) {
+ int c = readByteAsInt();
+ switch (c) {
+ case -1:
+ lastIdx = this.pos;
+ break;
+ case '\n':
+ lastIdx = this.pos - 1;
+ break;
+ case '\r':
+ lastIdx = this.pos - 1;
+ int c2 = readByteAsInt();
+ if (c2 != '\n' && c2 != -1) {
+ this.pos--;
+ }
+ break;
+ }
+ }
+ return new String(this.bytes, 0, startIdx, lastIdx - startIdx);
}
/**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index 7b84506..fecb9b0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -17,9 +17,7 @@ package org.apache.geode.internal.cache;
import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE;
import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_OLD_VALUE;
-import java.io.ByteArrayInputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.util.function.Function;
@@ -1810,7 +1808,7 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
try {
long start = CachePerfStats.getStatTime();
((org.apache.geode.Delta) value)
- .fromDelta(new DataInputStream(new ByteArrayInputStream(getDeltaBytes())));
+ .fromDelta(new ByteArrayDataInput(getDeltaBytes()));
getRegion().getCachePerfStats().endDeltaUpdate(start);
deltaBytesApplied = true;
} catch (RuntimeException rte) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
index 0a6ce63..04d1343 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
@@ -14,9 +14,7 @@
*/
package org.apache.geode.internal.cache;
-import java.io.ByteArrayInputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.Externalizable;
import java.io.IOException;
@@ -39,11 +37,11 @@ import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.ByteArrayDataInput;
import org.apache.geode.internal.DataSerializableFixedID;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.Version;
-import org.apache.geode.internal.VersionedDataInputStream;
import org.apache.geode.internal.cache.ha.HARegionQueue;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
@@ -310,13 +308,13 @@ public class EventID implements DataSerializableFixedID, Serializable, Externali
* have UUID bytes in the memberID. Newer clients don't require this.
*/
public InternalDistributedMember getDistributedMember(Version targetVersion) {
- ByteArrayInputStream bais = new ByteArrayInputStream(this.membershipID);
- DataInputStream dis = new DataInputStream(bais);
+ Version disVersion = null;
if (targetVersion.compareTo(Version.GEODE_1_1_0) < 0) {
// GEODE-3153: clients expect to receive UUID bytes, which are only
// read if the stream's version is 1.0.0-incubating
- dis = new VersionedDataInputStream(dis, Version.GFE_90);
+ disVersion = Version.GFE_90;
}
+ ByteArrayDataInput dis = new ByteArrayDataInput(membershipID, disVersion);
InternalDistributedMember result = null;
try {
result = InternalDistributedMember.readEssentialData(dis);
@@ -506,7 +504,7 @@ public class EventID implements DataSerializableFixedID, Serializable, Externali
Object mbr;
try {
mbr = InternalDistributedMember
- .readEssentialData(new DataInputStream(new ByteArrayInputStream(membershipID)));
+ .readEssentialData(new ByteArrayDataInput(membershipID));
} catch (Exception e) {
mbr = membershipID; // punt and use the bytes
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
index 22af908..835b3e9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
@@ -14,12 +14,9 @@
*/
package org.apache.geode.internal.cache;
-import java.io.ByteArrayInputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
-import java.io.InputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -32,11 +29,11 @@ import org.apache.geode.annotations.Immutable;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.ByteArrayDataInput;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.ObjToByteArraySerializer;
import org.apache.geode.internal.Version;
-import org.apache.geode.internal.VersionedDataInputStream;
import org.apache.geode.internal.VersionedDataSerializable;
/**
@@ -490,13 +487,7 @@ public class FilterRoutingInfo implements VersionedDataSerializable {
*/
private void deserialize() {
try {
- InputStream is = new ByteArrayInputStream(myData);
- DataInputStream dis;
- if (this.myDataVersion != null) {
- dis = new VersionedDataInputStream(is, this.myDataVersion);
- } else {
- dis = new DataInputStream(is);
- }
+ ByteArrayDataInput dis = new ByteArrayDataInput(myData, myDataVersion);
boolean hasCQs = dis.readBoolean();
if (hasCQs) {
int numEntries = InternalDataSerializer.readArrayLength(dis);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java
index 446ce20..ed7af75 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java
@@ -14,9 +14,7 @@
*/
package org.apache.geode.internal.cache.ha;
-import java.io.ByteArrayInputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
@@ -24,6 +22,7 @@ import java.util.Arrays;
import org.apache.geode.DataSerializable;
import org.apache.geode.DataSerializer;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.ByteArrayDataInput;
import org.apache.geode.internal.cache.EventID;
/**
@@ -204,7 +203,7 @@ public class ThreadIdentifier implements DataSerializable {
Object mbr;
try {
mbr = InternalDistributedMember
- .readEssentialData(new DataInputStream(new ByteArrayInputStream(membershipID)));
+ .readEssentialData(new ByteArrayDataInput(membershipID));
} catch (Exception e) {
mbr = membershipID; // punt and use the bytes
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AuthIds.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AuthIds.java
index 556cb36..9bf4b3e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AuthIds.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AuthIds.java
@@ -16,15 +16,15 @@
package org.apache.geode.internal.cache.tier.sockets;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
+
+import org.apache.geode.internal.ByteArrayDataInput;
public class AuthIds {
private long connectionId;
private long uniqueId;
public AuthIds(byte[] bytes) throws Exception {
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
+ ByteArrayDataInput dis = new ByteArrayDataInput(bytes);
if (bytes.length == 8) {
// only connectionid
connectionId = dis.readLong();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index acf21cb..5c12281 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -14,8 +14,6 @@
*/
package org.apache.geode.internal.cache.tier.sockets;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
@@ -70,6 +68,7 @@ import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.ByteArrayDataInput;
import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.SystemTimer.SystemTimerTask;
import org.apache.geode.internal.Version;
@@ -2852,7 +2851,7 @@ public class CacheClientProxy implements ClientSession {
// This is a debugging method so ignore all exceptions like
// ClassNotFoundException
try {
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(serializedBytes));
+ ByteArrayDataInput dis = new ByteArrayDataInput(serializedBytes);
deserializedObject = DataSerializer.readObject(dis);
} catch (Exception e) {
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
index d492969..c335e88 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
@@ -14,8 +14,6 @@
*/
package org.apache.geode.internal.cache.tier.sockets;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
@@ -65,6 +63,7 @@ import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.MemberAttributes;
import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.ByteArrayDataInput;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.InternalInstantiator;
import org.apache.geode.internal.Version;
@@ -1804,7 +1803,7 @@ public class CacheClientUpdater extends LoggingThread implements ClientUpdater,
Object deserializedObject = serializedBytes;
// This is a debugging method so ignore all exceptions like ClassNotFoundException
try {
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(serializedBytes));
+ ByteArrayDataInput dis = new ByteArrayDataInput(serializedBytes);
deserializedObject = DataSerializer.readObject(dis);
} catch (ClassNotFoundException | IOException ignore) {
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
index 64ad8f9..be95048 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
@@ -16,9 +16,7 @@ package org.apache.geode.internal.cache.tier.sockets;
import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
-import java.io.ByteArrayInputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.Externalizable;
import java.io.IOException;
@@ -38,10 +36,10 @@ import org.apache.geode.distributed.DurableClientAttributes;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.ByteArrayDataInput;
import org.apache.geode.internal.DataSerializableFixedID;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.Version;
-import org.apache.geode.internal.VersionedDataInputStream;
import org.apache.geode.internal.logging.LogService;
/**
@@ -393,10 +391,9 @@ public class ClientProxyMembershipID
public DistributedMember getDistributedMember() {
if (memberId == null) {
- ByteArrayInputStream bais = new ByteArrayInputStream(identity);
- DataInputStream dis = new VersionedDataInputStream(bais, Version.CURRENT);
+ ByteArrayDataInput dataInput = new ByteArrayDataInput(identity);
try {
- memberId = (DistributedMember) DataSerializer.readObject(dis);
+ memberId = (DistributedMember) DataSerializer.readObject(dataInput);
} catch (Exception e) {
logger.error("Unable to deserialize membership id", e);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
index 36f9f7f..64c180b 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
@@ -15,9 +15,7 @@
package org.apache.geode.internal.cache.tier.sockets;
-import java.io.ByteArrayInputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
@@ -34,6 +32,7 @@ import org.apache.geode.GemFireIOException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.internal.ByteArrayDataInput;
import org.apache.geode.internal.DSCODE;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.Sendable;
@@ -1219,7 +1218,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
// This is a debugging method so ignore all exceptions like
// ClassNotFoundException
try {
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(serializedBytes));
+ ByteArrayDataInput dis = new ByteArrayDataInput(serializedBytes);
deserializedObject = DataSerializer.readObject(dis);
} catch (Exception e) {
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/EncryptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/EncryptorImpl.java
index 66d6180..abe74fa 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/EncryptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/EncryptorImpl.java
@@ -23,7 +23,6 @@ import static org.apache.geode.internal.cache.tier.sockets.Handshake.PUBLIC_KEY_
import static org.apache.geode.internal.cache.tier.sockets.Handshake.REPLY_AUTH_NOT_REQUIRED;
import static org.apache.geode.internal.cache.tier.sockets.Handshake.REPLY_OK;
-import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileInputStream;
@@ -66,6 +65,7 @@ import org.apache.geode.LogWriter;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.internal.ByteArrayDataInput;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.tier.Encryptor;
@@ -582,8 +582,7 @@ public class EncryptorImpl implements Encryptor {
byte[] encBytes = DataSerializer.readByteArray(dis);
Cipher c = getDecryptCipher(this.clientSKAlgo, this.clientPublicKey);
byte[] credentialBytes = decryptBytes(encBytes, c);
- ByteArrayInputStream bis = new ByteArrayInputStream(credentialBytes);
- DataInputStream dinp = new DataInputStream(bis);
+ ByteArrayDataInput dinp = new ByteArrayDataInput(credentialBytes);
// credentials = DataSerializer.readProperties(dinp);//Hitesh: we don't send in handshake
// now
byte[] challengeRes = DataSerializer.readByteArray(dinp);
@@ -687,8 +686,7 @@ public class EncryptorImpl implements Encryptor {
}
byte[] credentialBytes = decrypt.doFinal(encBytes);
- ByteArrayInputStream bis = new ByteArrayInputStream(credentialBytes);
- DataInputStream dinp = new DataInputStream(bis);
+ ByteArrayDataInput dinp = new ByteArrayDataInput(credentialBytes);
credentials = DataSerializer.readProperties(dinp);
byte[] challengeRes = DataSerializer.readByteArray(dinp);
// Check the challenge string
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index aeedd9e..d433b81 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -18,8 +18,6 @@ import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIE
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_ACCESSOR_PP;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTHENTICATOR;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
@@ -55,6 +53,7 @@ import org.apache.geode.cache.UnsupportedVersionException;
import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.ByteArrayDataInput;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.EventID;
@@ -1070,8 +1069,7 @@ public abstract class ServerConnection implements Runnable {
credBytes = handshake.getEncryptor().decryptBytes(credBytes);
- ByteArrayInputStream bis = new ByteArrayInputStream(credBytes);
- DataInputStream dinp = new DataInputStream(bis);
+ ByteArrayDataInput dinp = new ByteArrayDataInput(credBytes);
Properties credentials = DataSerializer.readProperties(dinp);
// When here, security is enforced on server, if login returns a subject, then it's the newly
diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/DataType.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/DataType.java
index 35183a5..e4c495f 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/offheap/DataType.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/DataType.java
@@ -14,13 +14,12 @@
*/
package org.apache.geode.internal.offheap;
-import java.io.ByteArrayInputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.IOException;
import org.apache.geode.DataSerializer;
import org.apache.geode.Instantiator;
+import org.apache.geode.internal.ByteArrayDataInput;
import org.apache.geode.internal.DSCODE;
import org.apache.geode.internal.DSFIDFactory;
import org.apache.geode.internal.InternalDataSerializer;
@@ -280,6 +279,6 @@ public class DataType {
}
public static DataInput getDataInput(byte[] bytes) {
- return new DataInputStream(new ByteArrayInputStream(bytes));
+ return new ByteArrayDataInput(bytes);
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/ByteArrayDataInputTest.java b/geode-core/src/test/java/org/apache/geode/internal/ByteArrayDataInputTest.java
new file mode 100644
index 0000000..5a89dd0
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/ByteArrayDataInputTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.junit.Test;
+
+public class ByteArrayDataInputTest {
+ @Test
+ public void readFullyThatReadsPastEndOfDataThrowsEOFException() throws IOException {
+ byte[] inputBytes = new byte[1];
+ DataInput input = createDataInput(inputBytes);
+ byte[] outputBytes = new byte[2];
+
+ Throwable t = catchThrowable(() -> input.readFully(outputBytes));
+
+ assertThat(t).isInstanceOf(EOFException.class);
+ }
+
+ @Test
+ public void readLineGivenInputAtEOFReturnsNull() throws IOException {
+ byte[] inputBytes = new byte[1];
+ DataInput input = createDataInput(inputBytes);
+ input.readByte();
+
+ String result = input.readLine();
+
+ assertThat(result).isNull();
+ assertThat(dataRemaining(input)).isEqualTo(0);
+ }
+
+ @Test
+ public void readLineGivenEmptyLineTerminatedByLineFeedReturnsEmptyString() throws IOException {
+ byte[] inputBytes = new byte[] {'\n'};
+ DataInput input = createDataInput(inputBytes);
+
+ String result = input.readLine();
+
+ assertThat(result).isEmpty();
+ assertThat(dataRemaining(input)).isEqualTo(0);
+ }
+
+ @Test
+ public void readLineGivenEmptyLineTerminatedByLineFeedReturnsEmptyStringDoesNotConsumeNextByte()
+ throws IOException {
+ byte[] inputBytes = new byte[] {'\n', 'a'};
+ DataInput input = createDataInput(inputBytes);
+
+ String result = input.readLine();
+
+ assertThat(result).isEmpty();
+ assertThat(dataRemaining(input)).isEqualTo(1);
+ assertThat(input.readUnsignedByte()).isEqualTo((int) 'a');
+ }
+
+ @Test
+ public void readLineGivenEmptyLineTerminatedByCarriageReturnReturnsEmptyString()
+ throws IOException {
+ byte[] inputBytes = new byte[] {'\r'};
+ DataInput input = createDataInput(inputBytes);
+
+ String result = input.readLine();
+
+ assertThat(result).isEmpty();
+ assertThat(dataRemaining(input)).isEqualTo(0);
+ }
+
+ @Test
+ public void readLineGivenEmptyLineTerminatedByCarriageReturnReturnsEmptyStringAndDoesNotConsumeNextByte()
+ throws IOException {
+ byte[] inputBytes = new byte[] {'\r', 'a'};
+ DataInput input = createDataInput(inputBytes);
+
+ String result = input.readLine();
+
+ assertThat(result).isEmpty();
+ assertThat(dataRemaining(input)).isEqualTo(1);
+ assertThat(input.readUnsignedByte()).isEqualTo((int) 'a');
+ }
+
+ @Test
+ public void readLineGivenEmptyLineTerminatedByCarriageReturnLineFeedReturnsEmptyString()
+ throws IOException {
+ byte[] inputBytes = new byte[] {'\r', '\n'};
+ DataInput input = createDataInput(inputBytes);
+
+ String result = input.readLine();
+
+ assertThat(result).isEmpty();
+ assertThat(dataRemaining(input)).isEqualTo(0);
+ }
+
+ @Test
+ public void readLineGivenEmptyLineTerminatedByCarriageReturnLineFeedReturnsEmptyStringDoesNotConsumeNextByte()
+ throws IOException {
+ byte[] inputBytes = new byte[] {'\r', '\n', 'a'};
+ DataInput input = createDataInput(inputBytes);
+
+ String result = input.readLine();
+
+ assertThat(result).isEmpty();
+ assertThat(dataRemaining(input)).isEqualTo(1);
+ assertThat(input.readUnsignedByte()).isEqualTo((int) 'a');
+ }
+
+ @Test
+ public void readLineGivenLineTerminatedByEOFReturnsCorrectLineData() throws IOException {
+ byte[] inputBytes = new byte[] {'a', 'b', 'c'};
+ DataInput input = createDataInput(inputBytes);
+
+ String result = input.readLine();
+
+ assertThat(result).isEqualTo("abc");
+ assertThat(dataRemaining(input)).isEqualTo(0);
+ }
+
+ @Test
+ public void readLineGivenLineTerminatedByLineFeedReturnsCorrectLineData() throws IOException {
+ byte[] inputBytes = new byte[] {'a', 'b', 'c', '\n', '2'};
+ DataInput input = createDataInput(inputBytes);
+
+ String result = input.readLine();
+
+ assertThat(result).isEqualTo("abc");
+ assertThat(dataRemaining(input)).isEqualTo(1);
+ assertThat(input.readUnsignedByte()).isEqualTo((int) '2');
+ }
+
+ @Test
+ public void readLineGivenLineTerminatedByCarriageReturnReturnsCorrectLineData()
+ throws IOException {
+ byte[] inputBytes = new byte[] {'a', 'b', 'c', '\r', '2'};
+ DataInput input = createDataInput(inputBytes);
+
+ String result = input.readLine();
+
+ assertThat(result).isEqualTo("abc");
+ assertThat(dataRemaining(input)).isEqualTo(1);
+ assertThat(input.readUnsignedByte()).isEqualTo((int) '2');
+ }
+
+ @Test
+ public void readLineGivenLineTerminatedByCarriageReturnLineFeedReturnsCorrectLineData()
+ throws IOException {
+ byte[] inputBytes = new byte[] {'a', 'b', 'c', '\r', '\n', '2'};
+ DataInput input = createDataInput(inputBytes);
+
+ String result = input.readLine();
+
+ assertThat(result).isEqualTo("abc");
+ assertThat(dataRemaining(input)).isEqualTo(1);
+ assertThat(input.readUnsignedByte()).isEqualTo((int) '2');
+ }
+
+ /**
+ * We want ByteArrayDataInput to behave like DataInputStream(ByteArrayInputStream).
+ * This boolean allows us to switch back and forth in this test to make sure
+ * they both behave the same. It should never be checked in with testJDK=true.
+ */
+ private boolean testJDK = false;
+
+ private DataInput createDataInput(byte[] inputBytes) {
+ if (testJDK) {
+ return new java.io.DataInputStream(new java.io.ByteArrayInputStream(inputBytes));
+ } else {
+ ByteArrayDataInput input = new ByteArrayDataInput();
+ input.initialize(inputBytes, null);
+ return input;
+ }
+ }
+
+ private int dataRemaining(DataInput dataInput) {
+ if (testJDK) {
+ try {
+ return ((java.io.DataInputStream) dataInput).available();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ } else {
+ return ((ByteArrayDataInput) dataInput).available();
+ }
+ }
+}