You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2021/11/10 00:08:19 UTC
[geode] branch support/1.13 updated: GEODE-8542: java.lang.IllegalStateException: tcp message exceeded max… (#5562)
This is an automated email from the ASF dual-hosted git repository.
burcham pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.13 by this push:
new 653e8cd GEODE-8542: java.lang.IllegalStateException: tcp message exceeded max… (#5562)
653e8cd is described below
commit 653e8cde775bb3f97f21c91017a12a809a54b51e
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Tue Sep 29 10:17:56 2020 -0700
GEODE-8542: java.lang.IllegalStateException: tcp message exceeded max… (#5562)
Limit the size of message chunks to the maximum message size allowed
by org.apache.geode.internal.tcp.Connection.
(cherry picked from commit b439d3301dc15a81a9917b05ca4bd0717d1718bc)
---
.../org/apache/geode/internal/tcp/MsgStreamer.java | 9 ++++++-
.../apache/geode/internal/tcp/MsgStreamerTest.java | 29 ++++++++++++++++++++--
2 files changed, 35 insertions(+), 3 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
index 603d2bd..6157928 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
@@ -27,6 +27,7 @@ import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import org.apache.geode.DataSerializer;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.internal.Assert;
@@ -129,7 +130,8 @@ public class MsgStreamer extends OutputStream
this.stats = stats;
this.msg = msg;
this.cons = cons;
- this.buffer = bufferPool.acquireDirectSenderBuffer(sendBufferSize);
+ int bufferSize = Math.min(sendBufferSize, Connection.MAX_MSG_SIZE);
+ this.buffer = bufferPool.acquireDirectSenderBuffer(bufferSize);
this.buffer.clear();
this.buffer.position(Connection.MSG_HEADER_BYTES);
this.msgId = MsgIdGenerator.NO_MSG_ID;
@@ -351,6 +353,11 @@ public class MsgStreamer extends OutputStream
this.buffer.position(Connection.MSG_HEADER_BYTES);
}
+ @VisibleForTesting
+ protected ByteBuffer getBuffer() {
+ return buffer;
+ }
+
@Override
public void close() throws IOException {
try {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
index 1631b08..d1f1509 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
@@ -77,6 +77,31 @@ public class MsgStreamerTest {
verify(pool, times(2)).releaseSenderBuffer(isA(ByteBuffer.class));
}
+ @Test
+ public void streamerRespectsMaxMessageSize() {
+ InternalDistributedMember member1;
+ member1 = new InternalDistributedMember("localhost", 1234);
+
+ DistributionMessage message = new SerialAckedMessage();
+ message.setRecipients(Arrays.asList(member1));
+
+ when(connection1.getRemoteAddress()).thenReturn(member1);
+ when(connection1.getRemoteVersion()).thenReturn(Version.CURRENT);
+ // create a streamer for a Connection that has a buffer size that's larger than the
+ // biggest message we can actually send. This is picked up by the MsgStreamer to allocate
+ // a buffer
+ when(connection1.getSendBufferSize()).thenReturn(Connection.MAX_MSG_SIZE + 1);
+ List<Connection> connections = Arrays.asList(connection1);
+
+ final BaseMsgStreamer msgStreamer =
+ MsgStreamer.create(connections, message, false, stats, pool);
+ // the streamer ought to have limited the message buffer to MAX_MSG_SIZE
+ assertThat(((MsgStreamer) msgStreamer).getBuffer().capacity())
+ .isEqualTo(Connection.MAX_MSG_SIZE);
+ }
+
+
+
protected BaseMsgStreamer createMsgStreamer(boolean mixedDestinationVersions) {
InternalDistributedMember member1, member2;
@@ -92,9 +117,9 @@ public class MsgStreamerTest {
when(connection2.getRemoteAddress()).thenReturn(member2);
when(connection2.getSendBufferSize()).thenReturn(Connection.SMALL_BUFFER_SIZE);
if (mixedDestinationVersions) {
- when(connection1.getRemoteVersion()).thenReturn(Version.GEODE_1_12_0);
+ when(connection2.getRemoteVersion()).thenReturn(Version.GEODE_1_12_0);
} else {
- when(connection1.getRemoteVersion()).thenReturn(Version.CURRENT);
+ when(connection2.getRemoteVersion()).thenReturn(Version.CURRENT);
}
List<Connection> connections = Arrays.asList(connection1, connection2);