You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2021/11/10 00:10:02 UTC
[geode] branch support/1.12 updated: GEODE-8542: java.lang.IllegalStateException: tcp message exceeded max… (#5562)
This is an automated email from the ASF dual-hosted git repository.
burcham pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.12 by this push:
new 26f7c17 GEODE-8542: java.lang.IllegalStateException: tcp message exceeded max… (#5562)
26f7c17 is described below
commit 26f7c17b49b4e83f2af40fca66c8719658a53bf2
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Tue Sep 29 10:17:56 2020 -0700
GEODE-8542: java.lang.IllegalStateException: tcp message exceeded max… (#5562)
Limit the size of message chunks to the maximum message size allowed
by org.apache.geode.internal.tcp.Connection.
(cherry picked from commit b439d3301dc15a81a9917b05ca4bd0717d1718bc)
(cherry picked from commit cb07f831b1ce1023608e59f873b015f1ae2768bc)
---
.../org/apache/geode/internal/tcp/MsgStreamer.java | 9 +-
.../apache/geode/internal/tcp/MsgStreamerTest.java | 129 +++++++++++++++++++++
2 files changed, 137 insertions(+), 1 deletion(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
index ed25ce3..69f8047 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
@@ -27,6 +27,7 @@ import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import org.apache.geode.DataSerializer;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.internal.Assert;
@@ -129,7 +130,8 @@ public class MsgStreamer extends OutputStream
this.stats = stats;
this.msg = msg;
this.cons = cons;
- this.buffer = bufferPool.acquireDirectSenderBuffer(sendBufferSize);
+ int bufferSize = Math.min(sendBufferSize, Connection.MAX_MSG_SIZE);
+ this.buffer = bufferPool.acquireDirectSenderBuffer(bufferSize);
this.buffer.clear();
this.buffer.position(Connection.MSG_HEADER_BYTES);
this.msgId = MsgIdGenerator.NO_MSG_ID;
@@ -347,6 +349,11 @@ public class MsgStreamer extends OutputStream
this.buffer.position(Connection.MSG_HEADER_BYTES);
}
+ @VisibleForTesting
+ protected ByteBuffer getBuffer() {
+ return buffer;
+ }
+
@Override
public void close() throws IOException {
try {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
new file mode 100644
index 0000000..22f5756
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.tcp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import javax.net.ssl.SSLException;
+
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.SerialAckedMessage;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.serialization.Version;
+
+public class MsgStreamerTest {
+ private DMStats stats = mock(DMStats.class);
+ private BufferPool pool = spy(new BufferPool(stats));
+ Connection connection1 = mock(Connection.class);
+ Connection connection2 = mock(Connection.class);
+
+ // This test relies on GEODE-8020 fix, which has not yet been back-ported to 1.12
+ // @Test
+ // public void create() {
+ // final BaseMsgStreamer msgStreamer = createMsgStreamer(false);
+ // assertThat(msgStreamer).isInstanceOf(MsgStreamer.class);
+ // }
+
+ @Test
+ public void createWithMixedVersions() {
+ final BaseMsgStreamer msgStreamer = createMsgStreamer(true);
+ assertThat(msgStreamer).isInstanceOf(MsgStreamerList.class);
+ }
+
+ @Test
+ public void streamerListRelease() throws IOException {
+ final MsgStreamerList msgStreamer = (MsgStreamerList) createMsgStreamer(true);
+ msgStreamer.writeMessage();
+ verify(pool, times(2)).releaseSenderBuffer(isA(ByteBuffer.class));
+ }
+
+ @Test
+ public void streamerListReleaseWithException() throws IOException {
+ final MsgStreamerList msgStreamer = (MsgStreamerList) createMsgStreamer(true);
+ // if the first streamer throws an exception while writing the message we should still only
+ // release two buffers (one for each streamer)
+ doThrow(new SSLException("")).when(connection1).sendPreserialized(any(ByteBuffer.class),
+ any(Boolean.class), any(DistributionMessage.class));
+ msgStreamer.writeMessage();
+ verify(pool, times(2)).releaseSenderBuffer(isA(ByteBuffer.class));
+ }
+
+ @Test
+ public void streamerRespectsMaxMessageSize() {
+ InternalDistributedMember member1;
+ member1 = new InternalDistributedMember("localhost", 1234);
+
+ DistributionMessage message = new SerialAckedMessage();
+ message.setRecipients(Arrays.asList(member1));
+
+ when(connection1.getRemoteAddress()).thenReturn(member1);
+ when(connection1.getRemoteVersion()).thenReturn(Version.CURRENT);
+ // create a streamer for a Connection that has a buffer size that's larger than the
+ // biggest message we can actually send. This is picked up by the MsgStreamer to allocate
+ // a buffer
+ when(connection1.getSendBufferSize()).thenReturn(Connection.MAX_MSG_SIZE + 1);
+ List<Connection> connections = Arrays.asList(connection1);
+
+ final BaseMsgStreamer msgStreamer =
+ MsgStreamer.create(connections, message, false, stats, pool);
+ // the streamer ought to have limited the message buffer to MAX_MSG_SIZE
+ assertThat(((MsgStreamer) msgStreamer).getBuffer().capacity())
+ .isEqualTo(Connection.MAX_MSG_SIZE);
+ }
+
+
+
+ protected BaseMsgStreamer createMsgStreamer(boolean mixedDestinationVersions) {
+
+ InternalDistributedMember member1, member2;
+ member1 = new InternalDistributedMember("localhost", 1234);
+ member2 = new InternalDistributedMember("localhost", 2345);
+
+ DistributionMessage message = new SerialAckedMessage();
+ message.setRecipients(Arrays.asList(member1, member2));
+
+ when(connection1.getRemoteAddress()).thenReturn(member1);
+ when(connection1.getRemoteVersion()).thenReturn(Version.CURRENT);
+ when(connection1.getSendBufferSize()).thenReturn(Connection.SMALL_BUFFER_SIZE);
+ when(connection2.getRemoteAddress()).thenReturn(member2);
+ when(connection2.getSendBufferSize()).thenReturn(Connection.SMALL_BUFFER_SIZE);
+ if (mixedDestinationVersions) {
+ when(connection2.getRemoteVersion()).thenReturn(Version.GEODE_1_12_0);
+ } else {
+ when(connection2.getRemoteVersion()).thenReturn(Version.CURRENT);
+ }
+ List<Connection> connections = Arrays.asList(connection1, connection2);
+
+ return MsgStreamer.create(connections, message, false, stats, pool);
+ }
+}