You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2021/11/10 00:10:02 UTC

[geode] branch support/1.12 updated: GEODE-8542: java.lang.IllegalStateException: tcp message exceeded max… (#5562)

This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.12 by this push:
     new 26f7c17  GEODE-8542: java.lang.IllegalStateException: tcp message exceeded max… (#5562)
26f7c17 is described below

commit 26f7c17b49b4e83f2af40fca66c8719658a53bf2
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Tue Sep 29 10:17:56 2020 -0700

    GEODE-8542: java.lang.IllegalStateException: tcp message exceeded max… (#5562)
    
    Limit the size of message chunks to the maximum message size allowed
    by org.apache.geode.internal.tcp.Connection.
    
    (cherry picked from commit b439d3301dc15a81a9917b05ca4bd0717d1718bc)
    (cherry picked from commit cb07f831b1ce1023608e59f873b015f1ae2768bc)
---
 .../org/apache/geode/internal/tcp/MsgStreamer.java |   9 +-
 .../apache/geode/internal/tcp/MsgStreamerTest.java | 129 +++++++++++++++++++++
 2 files changed, 137 insertions(+), 1 deletion(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
index ed25ce3..69f8047 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
@@ -27,6 +27,7 @@ import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.objects.ObjectIterator;
 
 import org.apache.geode.DataSerializer;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.internal.Assert;
@@ -129,7 +130,8 @@ public class MsgStreamer extends OutputStream
     this.stats = stats;
     this.msg = msg;
     this.cons = cons;
-    this.buffer = bufferPool.acquireDirectSenderBuffer(sendBufferSize);
+    int bufferSize = Math.min(sendBufferSize, Connection.MAX_MSG_SIZE);
+    this.buffer = bufferPool.acquireDirectSenderBuffer(bufferSize);
     this.buffer.clear();
     this.buffer.position(Connection.MSG_HEADER_BYTES);
     this.msgId = MsgIdGenerator.NO_MSG_ID;
@@ -347,6 +349,11 @@ public class MsgStreamer extends OutputStream
     this.buffer.position(Connection.MSG_HEADER_BYTES);
   }
 
+  @VisibleForTesting
+  protected ByteBuffer getBuffer() {
+    return buffer;
+  }
+
   @Override
   public void close() throws IOException {
     try {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
new file mode 100644
index 0000000..22f5756
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.tcp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import javax.net.ssl.SSLException;
+
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.SerialAckedMessage;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.serialization.Version;
+
+public class MsgStreamerTest {
+  private DMStats stats = mock(DMStats.class);
+  private BufferPool pool = spy(new BufferPool(stats));
+  Connection connection1 = mock(Connection.class);
+  Connection connection2 = mock(Connection.class);
+
+  // This test relies on GEODE-8020 fix, which has not yet been back-ported to 1.12
+  // @Test
+  // public void create() {
+  // final BaseMsgStreamer msgStreamer = createMsgStreamer(false);
+  // assertThat(msgStreamer).isInstanceOf(MsgStreamer.class);
+  // }
+
+  @Test
+  public void createWithMixedVersions() {
+    final BaseMsgStreamer msgStreamer = createMsgStreamer(true);
+    assertThat(msgStreamer).isInstanceOf(MsgStreamerList.class);
+  }
+
+  @Test
+  public void streamerListRelease() throws IOException {
+    final MsgStreamerList msgStreamer = (MsgStreamerList) createMsgStreamer(true);
+    msgStreamer.writeMessage();
+    verify(pool, times(2)).releaseSenderBuffer(isA(ByteBuffer.class));
+  }
+
+  @Test
+  public void streamerListReleaseWithException() throws IOException {
+    final MsgStreamerList msgStreamer = (MsgStreamerList) createMsgStreamer(true);
+    // if the first streamer throws an exception while writing the message we should still only
+    // release two buffers (one for each streamer)
+    doThrow(new SSLException("")).when(connection1).sendPreserialized(any(ByteBuffer.class),
+        any(Boolean.class), any(DistributionMessage.class));
+    msgStreamer.writeMessage();
+    verify(pool, times(2)).releaseSenderBuffer(isA(ByteBuffer.class));
+  }
+
+  @Test
+  public void streamerRespectsMaxMessageSize() {
+    InternalDistributedMember member1;
+    member1 = new InternalDistributedMember("localhost", 1234);
+
+    DistributionMessage message = new SerialAckedMessage();
+    message.setRecipients(Arrays.asList(member1));
+
+    when(connection1.getRemoteAddress()).thenReturn(member1);
+    when(connection1.getRemoteVersion()).thenReturn(Version.CURRENT);
+    // create a streamer for a Connection that has a buffer size that's larger than the
+    // biggest message we can actually send. This is picked up by the MsgStreamer to allocate
+    // a buffer
+    when(connection1.getSendBufferSize()).thenReturn(Connection.MAX_MSG_SIZE + 1);
+    List<Connection> connections = Arrays.asList(connection1);
+
+    final BaseMsgStreamer msgStreamer =
+        MsgStreamer.create(connections, message, false, stats, pool);
+    // the streamer ought to have limited the message buffer to MAX_MSG_SIZE
+    assertThat(((MsgStreamer) msgStreamer).getBuffer().capacity())
+        .isEqualTo(Connection.MAX_MSG_SIZE);
+  }
+
+
+
+  protected BaseMsgStreamer createMsgStreamer(boolean mixedDestinationVersions) {
+
+    InternalDistributedMember member1, member2;
+    member1 = new InternalDistributedMember("localhost", 1234);
+    member2 = new InternalDistributedMember("localhost", 2345);
+
+    DistributionMessage message = new SerialAckedMessage();
+    message.setRecipients(Arrays.asList(member1, member2));
+
+    when(connection1.getRemoteAddress()).thenReturn(member1);
+    when(connection1.getRemoteVersion()).thenReturn(Version.CURRENT);
+    when(connection1.getSendBufferSize()).thenReturn(Connection.SMALL_BUFFER_SIZE);
+    when(connection2.getRemoteAddress()).thenReturn(member2);
+    when(connection2.getSendBufferSize()).thenReturn(Connection.SMALL_BUFFER_SIZE);
+    if (mixedDestinationVersions) {
+      when(connection2.getRemoteVersion()).thenReturn(Version.GEODE_1_12_0);
+    } else {
+      when(connection2.getRemoteVersion()).thenReturn(Version.CURRENT);
+    }
+    List<Connection> connections = Arrays.asList(connection1, connection2);
+
+    return MsgStreamer.create(connections, message, false, stats, pool);
+  }
+}