You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2021/11/10 00:08:19 UTC

[geode] branch support/1.13 updated: GEODE-8542: java.lang.IllegalStateException: tcp message exceeded max… (#5562)

This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.13 by this push:
     new 653e8cd  GEODE-8542: java.lang.IllegalStateException: tcp message exceeded max… (#5562)
653e8cd is described below

commit 653e8cde775bb3f97f21c91017a12a809a54b51e
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Tue Sep 29 10:17:56 2020 -0700

    GEODE-8542: java.lang.IllegalStateException: tcp message exceeded max… (#5562)
    
    Limit the size of message chunks to the maximum message size allowed
    by org.apache.geode.internal.tcp.Connection.
    
    (cherry picked from commit b439d3301dc15a81a9917b05ca4bd0717d1718bc)
---
 .../org/apache/geode/internal/tcp/MsgStreamer.java |  9 ++++++-
 .../apache/geode/internal/tcp/MsgStreamerTest.java | 29 ++++++++++++++++++++--
 2 files changed, 35 insertions(+), 3 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
index 603d2bd..6157928 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
@@ -27,6 +27,7 @@ import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.objects.ObjectIterator;
 
 import org.apache.geode.DataSerializer;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.internal.Assert;
@@ -129,7 +130,8 @@ public class MsgStreamer extends OutputStream
     this.stats = stats;
     this.msg = msg;
     this.cons = cons;
-    this.buffer = bufferPool.acquireDirectSenderBuffer(sendBufferSize);
+    int bufferSize = Math.min(sendBufferSize, Connection.MAX_MSG_SIZE);
+    this.buffer = bufferPool.acquireDirectSenderBuffer(bufferSize);
     this.buffer.clear();
     this.buffer.position(Connection.MSG_HEADER_BYTES);
     this.msgId = MsgIdGenerator.NO_MSG_ID;
@@ -351,6 +353,11 @@ public class MsgStreamer extends OutputStream
     this.buffer.position(Connection.MSG_HEADER_BYTES);
   }
 
+  @VisibleForTesting
+  protected ByteBuffer getBuffer() {
+    return buffer;
+  }
+
   @Override
   public void close() throws IOException {
     try {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
index 1631b08..d1f1509 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
@@ -77,6 +77,31 @@ public class MsgStreamerTest {
     verify(pool, times(2)).releaseSenderBuffer(isA(ByteBuffer.class));
   }
 
+  @Test
+  public void streamerRespectsMaxMessageSize() {
+    InternalDistributedMember member1;
+    member1 = new InternalDistributedMember("localhost", 1234);
+
+    DistributionMessage message = new SerialAckedMessage();
+    message.setRecipients(Arrays.asList(member1));
+
+    when(connection1.getRemoteAddress()).thenReturn(member1);
+    when(connection1.getRemoteVersion()).thenReturn(Version.CURRENT);
+    // create a streamer for a Connection that has a buffer size that's larger than the
+    // biggest message we can actually send. This is picked up by the MsgStreamer to allocate
+    // a buffer
+    when(connection1.getSendBufferSize()).thenReturn(Connection.MAX_MSG_SIZE + 1);
+    List<Connection> connections = Arrays.asList(connection1);
+
+    final BaseMsgStreamer msgStreamer =
+        MsgStreamer.create(connections, message, false, stats, pool);
+    // the streamer ought to have limited the message buffer to MAX_MSG_SIZE
+    assertThat(((MsgStreamer) msgStreamer).getBuffer().capacity())
+        .isEqualTo(Connection.MAX_MSG_SIZE);
+  }
+
+
+
   protected BaseMsgStreamer createMsgStreamer(boolean mixedDestinationVersions) {
 
     InternalDistributedMember member1, member2;
@@ -92,9 +117,9 @@ public class MsgStreamerTest {
     when(connection2.getRemoteAddress()).thenReturn(member2);
     when(connection2.getSendBufferSize()).thenReturn(Connection.SMALL_BUFFER_SIZE);
     if (mixedDestinationVersions) {
-      when(connection1.getRemoteVersion()).thenReturn(Version.GEODE_1_12_0);
+      when(connection2.getRemoteVersion()).thenReturn(Version.GEODE_1_12_0);
     } else {
-      when(connection1.getRemoteVersion()).thenReturn(Version.CURRENT);
+      when(connection2.getRemoteVersion()).thenReturn(Version.CURRENT);
     }
     List<Connection> connections = Arrays.asList(connection1, connection2);